diff --git a/bacata-core/src/main/java/communication/Header.java b/bacata-core/src/main/java/communication/Header.java index 8f0810b..ddc5d35 100644 --- a/bacata-core/src/main/java/communication/Header.java +++ b/bacata-core/src/main/java/communication/Header.java @@ -1,6 +1,8 @@ package communication; - +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.UUID; public class Header { @@ -44,6 +46,26 @@ public Header(String session, String msgType, String version, String username, S this.date = date; this.msgId = msgId; } + + public Header(String session, String msgType, String version, String username) { + this.session = session; + this.msgType = msgType; + this.version = version; + this.username = username; + + this.date = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT); + this.msgId = String.valueOf(UUID.randomUUID()); + } + + public Header(String msgType, Header parentHeader) { + this.session = parentHeader.getSession(); + this.msgType = msgType; + this.version = parentHeader.getVersion(); + this.username = parentHeader.getUsername(); + + this.date = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT); + this.msgId = String.valueOf(UUID.randomUUID()); + } // ----------------------------------------------------------------- // Methods diff --git a/bacata-core/src/main/java/server/JupyterServer.java b/bacata-core/src/main/java/server/JupyterServer.java index c98eb58..f15d258 100644 --- a/bacata-core/src/main/java/server/JupyterServer.java +++ b/bacata-core/src/main/java/server/JupyterServer.java @@ -120,6 +120,12 @@ public void startServer() throws JsonSyntaxException, JsonIOException, FileNotFo while (!Thread.currentThread().isInterrupted()) { poller.poll(); + if (poller.pollin(0)) { + Message message = getMessage(communication.getShellSocket()); + statusUpdate(message.getHeader(), Status.BUSY); + processShellMessage(message); + statusUpdate(message.getHeader(), Status.IDLE); + } if (poller.pollin(1)) { Message message = getMessage(communication.getControlSocket()); processControlMessage(message); @@ -130,16 +136,10 @@ public void startServer() throws JsonSyntaxException, JsonIOException, FileNotFo sendMessage(communication.getControlSocket(), header, message.getHeader(), contentReply); } } - if (poller.pollin(3)) - listenHeartbeatSocket(); - if (poller.pollin(0)) { - Message message = getMessage(communication.getShellSocket()); - statusUpdate(message.getHeader(), Status.BUSY); - processShellMessage(message); - statusUpdate(message.getHeader(), Status.IDLE); - } if (poller.pollin(2)) getMessage(communication.getIOPubSocket()); + if (poller.pollin(3)) + listenHeartbeatSocket(); } } @@ -165,7 +165,7 @@ public Message getMessage(ZMQ.Socket socket) throws RuntimeException { public void processControlMessage(Message message) { switch (message.getHeader().getMsgType()) { case "input_request": - System.out.println(""); + System.out.println("input_request"); break; default: break; @@ -175,29 +175,26 @@ public void processControlMessage(Message message) { public void processShellMessage(Message message) { - Content content; - Content contentReply; - Header header; - String session = message.getHeader().getSession(); - switch (message.getHeader().getMsgType()) { + Content content, contentReply; + Header header, parentHeader = message.getHeader(); // Parent header for the reply. + switch (parentHeader.getMsgType()) { case MessageType.KERNEL_INFO_REQUEST: -// statusUpdate(message.getHeader(), Status.STARTING); - header = createHeader(session, MessageType.KERNEL_INFO_REPLY); + header = new Header(MessageType.KERNEL_INFO_REPLY, parentHeader); contentReply = (ContentKernelInfoReply) processKernelInfoRequest(message); - sendMessage(communication.getShellSocket(), header, message.getHeader(), contentReply); + sendMessage(communication.getShellSocket(), header, parentHeader, contentReply); break; case MessageType.SHUTDOWN_REQUEST: - header = createHeader(session, MessageType.SHUTDOWN_REPLY); + header = new Header(MessageType.SHUTDOWN_REPLY, parentHeader); content = parser.fromJson(message.getRawContent(), ContentShutdownRequest.class); contentReply = (ContentShutdownReply) processShutdownRequest((ContentShutdownRequest) content); closeAllSockets(); - sendMessage(communication.getShellSocket(), header, message.getHeader(), contentReply); + sendMessage(communication.getShellSocket(), header, parentHeader, contentReply); break; case MessageType.IS_COMPLETE_REQUEST: header = createHeader(message.getHeader().getSession(), MessageType.IS_COMPLETE_REPLY); content = parser.fromJson(message.getRawContent(), ContentIsCompleteRequest.class); contentReply = processIsCompleteRequest((ContentIsCompleteRequest) content); - sendMessage(communication.getShellSocket(),header , message.getHeader(), contentReply); + sendMessage(communication.getShellSocket(),header , parentHeader, contentReply); break; case MessageType.EXECUTE_REQUEST: content = parser.fromJson(message.getRawContent(), ContentExecuteRequest.class); @@ -207,10 +204,10 @@ public void processShellMessage(Message message) { processHistoryRequest(message); break; case MessageType.COMPLETE_REQUEST: - header = createHeader(session, MessageType.COMPLETE_REPLY); + header = new Header(MessageType.COMPLETE_REPLY, parentHeader); content = parser.fromJson(message.getRawContent(), ContentCompleteRequest.class); contentReply = processCompleteRequest((ContentCompleteRequest) content); - sendMessage(getCommunication().getShellSocket(), header, message.getHeader(), contentReply); + sendMessage(getCommunication().getShellSocket(), header, parentHeader, contentReply); break; case MessageType.INSPECT_REQUEST: break; @@ -273,7 +270,7 @@ public void sendShellMessage(Header header, Content content ) { public void heartbeatChannel() { communication.getHeartbeatSocket().send(HEARTBEAT_MESSAGE); } - + public Header createHeader(String pSession, String pMessageType) { String timestamp = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT); String msgid = String.valueOf(UUID.randomUUID()); @@ -300,7 +297,7 @@ public Gson getParser() { * This method updates the kernel status with the value received as a parameter. */ public void statusUpdate(Header parentHeader, String status) { - Header header = createHeader(parentHeader.getSession(), MessageType.STATUS); + Header header = new Header(parentHeader.getSession(), MessageType.STATUS, parentHeader.getVersion(), parentHeader.getUsername()); ContentStatus content = new ContentStatus(status); sendMessage(communication.getIOPubSocket(), header, parentHeader, content); }