Skip to content

Commit

Permalink
Fixed the problem of missing data for the parent header in each request.
Browse files Browse the repository at this point in the history
  • Loading branch information
maveme committed Nov 25, 2019
1 parent 7ea0816 commit 4b9faf8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
24 changes: 23 additions & 1 deletion bacata-core/src/main/java/communication/Header.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package communication;


import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;

public class Header {

Expand Down Expand Up @@ -44,6 +46,26 @@ public Header(String session, String msgType, String version, String username, S
this.date = date;
this.msgId = msgId;
}

public Header(String session, String msgType, String version, String username) {
this.session = session;
this.msgType = msgType;
this.version = version;
this.username = username;

this.date = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT);
this.msgId = String.valueOf(UUID.randomUUID());
}

public Header(String msgType, Header parentHeader) {
this.session = parentHeader.getSession();
this.msgType = msgType;
this.version = parentHeader.getVersion();
this.username = parentHeader.getUsername();

this.date = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT);
this.msgId = String.valueOf(UUID.randomUUID());
}

// -----------------------------------------------------------------
// Methods
Expand Down
45 changes: 21 additions & 24 deletions bacata-core/src/main/java/server/JupyterServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public void startServer() throws JsonSyntaxException, JsonIOException, FileNotFo

while (!Thread.currentThread().isInterrupted()) {
poller.poll();
if (poller.pollin(0)) {
Message message = getMessage(communication.getShellSocket());
statusUpdate(message.getHeader(), Status.BUSY);
processShellMessage(message);
statusUpdate(message.getHeader(), Status.IDLE);
}
if (poller.pollin(1)) {
Message message = getMessage(communication.getControlSocket());
processControlMessage(message);
Expand All @@ -130,16 +136,10 @@ public void startServer() throws JsonSyntaxException, JsonIOException, FileNotFo
sendMessage(communication.getControlSocket(), header, message.getHeader(), contentReply);
}
}
if (poller.pollin(3))
listenHeartbeatSocket();
if (poller.pollin(0)) {
Message message = getMessage(communication.getShellSocket());
statusUpdate(message.getHeader(), Status.BUSY);
processShellMessage(message);
statusUpdate(message.getHeader(), Status.IDLE);
}
if (poller.pollin(2))
getMessage(communication.getIOPubSocket());
if (poller.pollin(3))
listenHeartbeatSocket();
}

}
Expand All @@ -165,7 +165,7 @@ public Message getMessage(ZMQ.Socket socket) throws RuntimeException {
public void processControlMessage(Message message) {
switch (message.getHeader().getMsgType()) {
case "input_request":
System.out.println("");
System.out.println("input_request");
break;
default:
break;
Expand All @@ -175,29 +175,26 @@ public void processControlMessage(Message message) {


public void processShellMessage(Message message) {
Content content;
Content contentReply;
Header header;
String session = message.getHeader().getSession();
switch (message.getHeader().getMsgType()) {
Content content, contentReply;
Header header, parentHeader = message.getHeader(); // Parent header for the reply.
switch (parentHeader.getMsgType()) {
case MessageType.KERNEL_INFO_REQUEST:
// statusUpdate(message.getHeader(), Status.STARTING);
header = createHeader(session, MessageType.KERNEL_INFO_REPLY);
header = new Header(MessageType.KERNEL_INFO_REPLY, parentHeader);
contentReply = (ContentKernelInfoReply) processKernelInfoRequest(message);
sendMessage(communication.getShellSocket(), header, message.getHeader(), contentReply);
sendMessage(communication.getShellSocket(), header, parentHeader, contentReply);
break;
case MessageType.SHUTDOWN_REQUEST:
header = createHeader(session, MessageType.SHUTDOWN_REPLY);
header = new Header(MessageType.SHUTDOWN_REPLY, parentHeader);
content = parser.fromJson(message.getRawContent(), ContentShutdownRequest.class);
contentReply = (ContentShutdownReply) processShutdownRequest((ContentShutdownRequest) content);
closeAllSockets();
sendMessage(communication.getShellSocket(), header, message.getHeader(), contentReply);
sendMessage(communication.getShellSocket(), header, parentHeader, contentReply);
break;
case MessageType.IS_COMPLETE_REQUEST:
header = createHeader(message.getHeader().getSession(), MessageType.IS_COMPLETE_REPLY);
content = parser.fromJson(message.getRawContent(), ContentIsCompleteRequest.class);
contentReply = processIsCompleteRequest((ContentIsCompleteRequest) content);
sendMessage(communication.getShellSocket(),header , message.getHeader(), contentReply);
sendMessage(communication.getShellSocket(),header , parentHeader, contentReply);
break;
case MessageType.EXECUTE_REQUEST:
content = parser.fromJson(message.getRawContent(), ContentExecuteRequest.class);
Expand All @@ -207,10 +204,10 @@ public void processShellMessage(Message message) {
processHistoryRequest(message);
break;
case MessageType.COMPLETE_REQUEST:
header = createHeader(session, MessageType.COMPLETE_REPLY);
header = new Header(MessageType.COMPLETE_REPLY, parentHeader);
content = parser.fromJson(message.getRawContent(), ContentCompleteRequest.class);
contentReply = processCompleteRequest((ContentCompleteRequest) content);
sendMessage(getCommunication().getShellSocket(), header, message.getHeader(), contentReply);
sendMessage(getCommunication().getShellSocket(), header, parentHeader, contentReply);
break;
case MessageType.INSPECT_REQUEST:
break;
Expand Down Expand Up @@ -273,7 +270,7 @@ public void sendShellMessage(Header header, Content content ) {
public void heartbeatChannel() {
communication.getHeartbeatSocket().send(HEARTBEAT_MESSAGE);
}

public Header createHeader(String pSession, String pMessageType) {
String timestamp = ZonedDateTime.now().format(DateTimeFormatter.ISO_INSTANT);
String msgid = String.valueOf(UUID.randomUUID());
Expand All @@ -300,7 +297,7 @@ public Gson getParser() {
* This method updates the kernel status with the value received as a parameter.
*/
public void statusUpdate(Header parentHeader, String status) {
Header header = createHeader(parentHeader.getSession(), MessageType.STATUS);
Header header = new Header(parentHeader.getSession(), MessageType.STATUS, parentHeader.getVersion(), parentHeader.getUsername());
ContentStatus content = new ContentStatus(status);
sendMessage(communication.getIOPubSocket(), header, parentHeader, content);
}
Expand Down

0 comments on commit 4b9faf8

Please sign in to comment.