From 4a1e81098a77f489e89ea3ab2da73fb4907c3316 Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Tue, 12 Aug 2014 13:04:13 +0300 Subject: [PATCH] improved logging --- .../mqtt/producer/MQTTProducerStep.java | 33 ++++++++++--------- .../messages/messages_en_US.properties | 4 +-- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java index 0656c6d..a11fe5a 100644 --- a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java @@ -67,8 +67,8 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) if (data.client == null) { String broker = environmentSubstitute(meta.getBroker()); try { - data.client = new MqttClient(broker, - environmentSubstitute(meta.getClientId())); + String clientId = environmentSubstitute(meta.getClientId()); + data.client = new MqttClient(broker, clientId); MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setCleanSession(true); @@ -84,7 +84,8 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) } logBasic(Messages.getString( - "MQTTClientStep.CreateMQTTClient.Message", broker)); + "MQTTClientStep.CreateMQTTClient.Message", broker, + clientId)); data.client.connect(connectOptions); } catch (MqttException e) { @@ -145,10 +146,10 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) MqttMessage mqttMessage = new MqttMessage(message); mqttMessage.setQos(qos); + logBasic(Messages.getString("MQTTClientStep.Log.SendingData", + topic, Integer.toString(qos))); if (isRowLevel()) { - logRowlevel(Messages.getString( - "MQTTClientStep.Log.SendingData", topic, - data.inputFieldMeta.getString(r[data.inputFieldNr]))); + logRowlevel(data.inputFieldMeta.getString(r[data.inputFieldNr])); } try { data.client.publish(topic, mqttMessage); @@ -179,16 +180,18 @@ public void stopRunning(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { MQTTProducerData data = (MQTTProducerData) sdi; - try { - if (data.client.isConnected()) { - data.client.disconnect(); + if (data.client != null) { + try { + if (data.client.isConnected()) { + data.client.disconnect(); + } + data.client.close(); + data.client = null; + } catch (MqttException e) { + logError( + Messages.getString("MQTTClientStep.ErrorClosingMQTTClient.Message"), + e); } - data.client.close(); - data.client = null; - } catch (MqttException e) { - logError( - Messages.getString("MQTTClientStep.ErrorClosingMQTTClient.Message"), - e); } super.stopRunning(smi, sdi); } diff --git a/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties b/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties index 2787969..4959bed 100644 --- a/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties +++ b/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties @@ -1,10 +1,10 @@ -MQTTClientStep.CreateMQTTClient.Message=Connecting to MQTT broker\: {0} +MQTTClientStep.CreateMQTTClient.Message=Connecting to MQTT broker\: {0} as client ''{1}'' MQTTClientStep.WrongTimeoutValue.Message=Wrong connection timeout value\: {0}! MQTTClientStep.WrongQOSValue.Message=Wrong QoS value\: {0}! MQTTClientStep.ErrorCreateMQTTClient.Message=Error connecting to MQTT broker! MQTTClientStep.ErrorClosingMQTTClient.Message=Error closing MQTT connection! MQTTClientStep.ErrorPublishing.Message=Error publishing MQTT message! -MQTTClientStep.Log.SendingData=Sending data through MQTT topic ''{0}'' : {1} +MQTTClientStep.Log.SendingData=Sending data through MQTT topic ''{0}'' with QoS={1} MQTTClientStep.ErrorInStepRunning=Error running step \: {0} MQTTClientStep.Log.FieldNameIsNull=Input field name not specified\! MQTTClientStep.Log.CouldntFindField=Couldn''t find field ''{0}'' in input stream\!