Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable DRPC request to be sent via HTTP and/or Thrift #733

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
50 changes: 41 additions & 9 deletions storm-core/src/clj/backtype/storm/daemon/drpc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
(:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress])
(:use [backtype.storm bootstrap config log])
(:use compojure.core)
(:use ring.middleware.reload)
(:use [ring.adapter.jetty :only [run-jetty]])
(:require [ring.util.response :as resp])
(:gen-class))

(bootstrap)
Expand Down Expand Up @@ -97,35 +101,63 @@
(.interrupt clear-thread))
)))

(defn handle-request [handler]
(fn [request]
(handler request)))

(defn webapp [handler]
(->(def http-routes
(routes
(GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m]
(.execute handler func args))
(GET "/drpc/:func/" [:as {cookies :cookies} func & m]
(.execute handler func ""))
(GET "/drpc/:func" [:as {cookies :cookies} func & m]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is in here twice by accident.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I missed the trailing '/'. Looks like jetty is not handling '/' characters very well.

(.execute handler func ""))))
(wrap-reload '[backtype.storm.daemon.drpc])
handle-request))

(defn launch-server!
([]
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler)
drpc-http-port (if (conf DRPC-HTTP-PORT) (int (conf DRPC-HTTP-PORT)) 0)
drpc-port (int (conf DRPC-PORT))
drpc-service-handler (service-handler)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
;; 64 threads are calling execute, the server won't accept the result
;; invocations that will unblock those threads
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
handler-server (if (> drpc-port 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot turn off Thrift unless we provide HTTP APIs for the spout/return bolt, and update them to know how to use the HTTP APIs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrift port will be turned off only if drpc server was launched with configuration stating so. As the default, it will be enabled.

(THsHaServer. (-> (TNonblockingServerSocket. drpc-port)
(THsHaServer$Args.)
(.workerThreads 64)
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler))
(.processor (DistributedRPC$Processor. drpc-service-handler))
))
)
invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPCInvocations$Processor. service-handler))
(.processor (DistributedRPCInvocations$Processor. drpc-service-handler))
))]

(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn []
(if handler-server (.stop handler-server))
(.stop invoke-server))))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
(.serve handler-server))))
(if (> drpc-http-port 0)
(run-jetty (webapp drpc-service-handler)
{:port drpc-http-port :join? false})
)
(if handler-server
(.serve handler-server)
)
)))

(defn -main []
(launch-server!))
6 changes: 6 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ public class Config extends HashMap<String, Object> {
public static final String DRPC_SERVERS = "drpc.servers";
public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator;

/**
* This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
*/
public static final String DRPC_HTTP_PORT = "drpc.http.port";
public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class;

/**
* This port is used by Storm DRPC for receiving DPRC requests from clients.
*/
Expand Down