From 5cd5986928ced02e7af340d2442636a5de28d78b Mon Sep 17 00:00:00 2001 From: Michael Spector Date: Mon, 11 Aug 2014 11:16:52 +0300 Subject: [PATCH] version 0.1 --- .../mqtt/producer/MQTTProducerData.java | 20 + .../mqtt/producer/MQTTProducerDialog.java | 342 ++++++++++++++++++ .../mqtt/producer/MQTTProducerMeta.java | 268 ++++++++++++++ .../mqtt/producer/MQTTProducerStep.java | 198 ++++++++++ .../pentaho/mqtt/producer/Messages.java | 36 ++ .../messages/messages_en_US.properties | 29 ++ 6 files changed, 893 insertions(+) create mode 100644 src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerData.java create mode 100644 src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerDialog.java create mode 100644 src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerMeta.java create mode 100644 src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java create mode 100644 src/main/java/com/ruckuswireless/pentaho/mqtt/producer/Messages.java create mode 100644 src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerData.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerData.java new file mode 100644 index 0000000..5b5e27f --- /dev/null +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerData.java @@ -0,0 +1,20 @@ +package com.ruckuswireless.pentaho.mqtt.producer; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.trans.step.BaseStepData; +import org.pentaho.di.trans.step.StepDataInterface; + +/** + * Holds data processed by this step + * + * @author Michael + */ +public class MQTTProducerData extends BaseStepData implements StepDataInterface { + + MqttClient client; + RowMetaInterface outputRowMeta; + int inputFieldNr; + ValueMetaInterface inputFieldMeta; +} diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerDialog.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerDialog.java new file mode 100644 index 0000000..37d3888 --- /dev/null +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerDialog.java @@ -0,0 +1,342 @@ +package com.ruckuswireless.pentaho.mqtt.producer; + +import org.eclipse.swt.SWT; +import org.eclipse.swt.custom.CCombo; +import org.eclipse.swt.events.ModifyEvent; +import org.eclipse.swt.events.ModifyListener; +import org.eclipse.swt.events.SelectionAdapter; +import org.eclipse.swt.events.SelectionEvent; +import org.eclipse.swt.events.ShellAdapter; +import org.eclipse.swt.events.ShellEvent; +import org.eclipse.swt.layout.FormAttachment; +import org.eclipse.swt.layout.FormData; +import org.eclipse.swt.layout.FormLayout; +import org.eclipse.swt.widgets.Button; +import org.eclipse.swt.widgets.Control; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Event; +import org.eclipse.swt.widgets.Label; +import org.eclipse.swt.widgets.Listener; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.Text; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.row.RowMeta; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.i18n.BaseMessages; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.BaseStepMeta; +import org.pentaho.di.trans.step.StepDialogInterface; +import org.pentaho.di.ui.core.dialog.ErrorDialog; +import org.pentaho.di.ui.core.widget.TextVar; +import org.pentaho.di.ui.trans.step.BaseStepDialog; + +/** + * UI for the MQTT Client step + * + * @author Michael Spector + */ +public class MQTTProducerDialog extends BaseStepDialog implements + StepDialogInterface { + + private MQTTProducerMeta producerMeta; + private TextVar wBroker; + private TextVar wTopicName; + private TextVar wClientID; + private TextVar wTimeout; + private TextVar wQOS; + private CCombo wInputField; + + public MQTTProducerDialog(Shell parent, Object in, TransMeta tr, String sname) { + super(parent, (BaseStepMeta) in, tr, sname); + producerMeta = (MQTTProducerMeta) in; + } + + public MQTTProducerDialog(Shell parent, BaseStepMeta baseStepMeta, + TransMeta transMeta, String stepname) { + super(parent, baseStepMeta, transMeta, stepname); + producerMeta = (MQTTProducerMeta) baseStepMeta; + } + + public MQTTProducerDialog(Shell parent, int nr, BaseStepMeta in, TransMeta tr) { + super(parent, nr, in, tr); + producerMeta = (MQTTProducerMeta) in; + } + + public String open() { + Shell parent = getParent(); + Display display = parent.getDisplay(); + + shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MIN + | SWT.MAX); + props.setLook(shell); + setShellImage(shell, producerMeta); + + ModifyListener lsMod = new ModifyListener() { + public void modifyText(ModifyEvent e) { + producerMeta.setChanged(); + } + }; + changed = producerMeta.hasChanged(); + + FormLayout formLayout = new FormLayout(); + formLayout.marginWidth = Const.FORM_MARGIN; + formLayout.marginHeight = Const.FORM_MARGIN; + + shell.setLayout(formLayout); + shell.setText(Messages.getString("MQTTClientDialog.Shell.Title")); + + int middle = props.getMiddlePct(); + int margin = Const.MARGIN; + + // Step name + wlStepname = new Label(shell, SWT.RIGHT); + wlStepname.setText(Messages + .getString("MQTTClientDialog.StepName.Label")); + props.setLook(wlStepname); + fdlStepname = new FormData(); + fdlStepname.left = new FormAttachment(0, 0); + fdlStepname.right = new FormAttachment(middle, -margin); + fdlStepname.top = new FormAttachment(0, margin); + wlStepname.setLayoutData(fdlStepname); + wStepname = new Text(shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER); + props.setLook(wStepname); + wStepname.addModifyListener(lsMod); + fdStepname = new FormData(); + fdStepname.left = new FormAttachment(middle, 0); + fdStepname.top = new FormAttachment(0, margin); + fdStepname.right = new FormAttachment(100, 0); + wStepname.setLayoutData(fdStepname); + Control lastControl = wStepname; + + // Broker URL + Label wlBroker = new Label(shell, SWT.RIGHT); + wlBroker.setText(Messages.getString("MQTTClientDialog.Broker.Label")); + props.setLook(wlBroker); + FormData fdlBroker = new FormData(); + fdlBroker.top = new FormAttachment(lastControl, margin); + fdlBroker.left = new FormAttachment(0, 0); + fdlBroker.right = new FormAttachment(middle, -margin); + wlBroker.setLayoutData(fdlBroker); + wBroker = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT + | SWT.BORDER); + props.setLook(wBroker); + wBroker.addModifyListener(lsMod); + FormData fdBroker = new FormData(); + fdBroker.top = new FormAttachment(lastControl, margin); + fdBroker.left = new FormAttachment(middle, 0); + fdBroker.right = new FormAttachment(100, 0); + wBroker.setLayoutData(fdBroker); + lastControl = wBroker; + + // Topic name + Label wlTopicName = new Label(shell, SWT.RIGHT); + wlTopicName.setText(Messages + .getString("MQTTClientDialog.TopicName.Label")); + props.setLook(wlTopicName); + FormData fdlTopicName = new FormData(); + fdlTopicName.top = new FormAttachment(lastControl, margin); + fdlTopicName.left = new FormAttachment(0, 0); + fdlTopicName.right = new FormAttachment(middle, -margin); + wlTopicName.setLayoutData(fdlTopicName); + wTopicName = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT + | SWT.BORDER); + props.setLook(wTopicName); + wTopicName.addModifyListener(lsMod); + FormData fdTopicName = new FormData(); + fdTopicName.top = new FormAttachment(lastControl, margin); + fdTopicName.left = new FormAttachment(middle, 0); + fdTopicName.right = new FormAttachment(100, 0); + wTopicName.setLayoutData(fdTopicName); + lastControl = wTopicName; + + // Client ID + Label wlClientID = new Label(shell, SWT.RIGHT); + wlClientID.setText(Messages + .getString("MQTTClientDialog.ClientID.Label")); + props.setLook(wlClientID); + FormData fdlClientID = new FormData(); + fdlClientID.top = new FormAttachment(lastControl, margin); + fdlClientID.left = new FormAttachment(0, 0); + fdlClientID.right = new FormAttachment(middle, -margin); + wlClientID.setLayoutData(fdlClientID); + wClientID = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT + | SWT.BORDER); + props.setLook(wClientID); + wClientID.addModifyListener(lsMod); + FormData fdClientID = new FormData(); + fdClientID.top = new FormAttachment(lastControl, margin); + fdClientID.left = new FormAttachment(middle, 0); + fdClientID.right = new FormAttachment(100, 0); + wClientID.setLayoutData(fdClientID); + lastControl = wClientID; + + // Connection timeout + Label wlConnectionTimeout = new Label(shell, SWT.RIGHT); + wlConnectionTimeout.setText(Messages + .getString("MQTTClientDialog.ConnectionTimeout.Label")); + props.setLook(wlConnectionTimeout); + FormData fdlConnectionTimeout = new FormData(); + fdlConnectionTimeout.top = new FormAttachment(lastControl, margin); + fdlConnectionTimeout.left = new FormAttachment(0, 0); + fdlConnectionTimeout.right = new FormAttachment(middle, -margin); + wlConnectionTimeout.setLayoutData(fdlConnectionTimeout); + wTimeout = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT + | SWT.BORDER); + props.setLook(wTimeout); + wTimeout.addModifyListener(lsMod); + FormData fdConnectionTimeout = new FormData(); + fdConnectionTimeout.top = new FormAttachment(lastControl, margin); + fdConnectionTimeout.left = new FormAttachment(middle, 0); + fdConnectionTimeout.right = new FormAttachment(100, 0); + wTimeout.setLayoutData(fdConnectionTimeout); + lastControl = wTimeout; + + // QOS + Label wlQOS = new Label(shell, SWT.RIGHT); + wlQOS.setText(Messages.getString("MQTTClientDialog.QOS.Label")); + props.setLook(wlQOS); + FormData fdlQOS = new FormData(); + fdlQOS.top = new FormAttachment(lastControl, margin); + fdlQOS.left = new FormAttachment(0, 0); + fdlQOS.right = new FormAttachment(middle, -margin); + wlQOS.setLayoutData(fdlQOS); + wQOS = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER); + props.setLook(wQOS); + wQOS.addModifyListener(lsMod); + FormData fdQOS = new FormData(); + fdQOS.top = new FormAttachment(lastControl, margin); + fdQOS.left = new FormAttachment(middle, 0); + fdQOS.right = new FormAttachment(100, 0); + wQOS.setLayoutData(fdQOS); + lastControl = wQOS; + + // Input field + RowMetaInterface previousFields; + try { + previousFields = transMeta.getPrevStepFields(stepMeta); + } catch (KettleStepException e) { + new ErrorDialog( + shell, + BaseMessages.getString("System.Dialog.Error.Title"), + Messages.getString("MQTTClientDialog.ErrorDialog.UnableToGetInputFields.Message"), + e); + previousFields = new RowMeta(); + } + Label wlInputField = new Label(shell, SWT.RIGHT); + wlInputField.setText(Messages + .getString("MQTTClientDialog.FieldName.Label")); + props.setLook(wlInputField); + FormData fdlInputField = new FormData(); + fdlInputField.top = new FormAttachment(lastControl, margin); + fdlInputField.left = new FormAttachment(0, 0); + fdlInputField.right = new FormAttachment(middle, -margin); + wlInputField.setLayoutData(fdlInputField); + wInputField = new CCombo(shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER); + wInputField.setItems(previousFields.getFieldNames()); + props.setLook(wInputField); + wInputField.addModifyListener(lsMod); + FormData fdFilename = new FormData(); + fdFilename.top = new FormAttachment(lastControl, margin); + fdFilename.left = new FormAttachment(middle, 0); + fdFilename.right = new FormAttachment(100, 0); + wInputField.setLayoutData(fdFilename); + lastControl = wInputField; + + // Buttons + wOK = new Button(shell, SWT.PUSH); + wOK.setText(BaseMessages.getString("System.Button.OK")); //$NON-NLS-1$ + wCancel = new Button(shell, SWT.PUSH); + wCancel.setText(BaseMessages.getString("System.Button.Cancel")); //$NON-NLS-1$ + + setButtonPositions(new Button[] { wOK, wCancel }, margin, null); + + // Add listeners + lsCancel = new Listener() { + public void handleEvent(Event e) { + cancel(); + } + }; + lsOK = new Listener() { + public void handleEvent(Event e) { + ok(); + } + }; + wCancel.addListener(SWT.Selection, lsCancel); + wOK.addListener(SWT.Selection, lsOK); + + lsDef = new SelectionAdapter() { + public void widgetDefaultSelected(SelectionEvent e) { + ok(); + } + }; + wStepname.addSelectionListener(lsDef); + wTopicName.addSelectionListener(lsDef); + wInputField.addSelectionListener(lsDef); + + // Detect X or ALT-F4 or something that kills this window... + shell.addShellListener(new ShellAdapter() { + public void shellClosed(ShellEvent e) { + cancel(); + } + }); + + // Set the shell size, based upon previous time... + setSize(shell, 440, 350, true); + + getData(producerMeta, true); + producerMeta.setChanged(changed); + + shell.open(); + while (!shell.isDisposed()) { + if (!display.readAndDispatch()) { + display.sleep(); + } + } + return stepname; + } + + /** + * Copy information from the meta-data input to the dialog fields. + */ + private void getData(MQTTProducerMeta producerMeta, boolean copyStepname) { + if (copyStepname) { + wStepname.setText(stepname); + } + wBroker.setText(Const.NVL(producerMeta.getBroker(), "")); + wTopicName.setText(Const.NVL(producerMeta.getTopic(), "")); + wInputField.setText(Const.NVL(producerMeta.getField(), "")); + wClientID.setText(Const.NVL(producerMeta.getClientId(), "")); + wTimeout.setText(Const.NVL(producerMeta.getTimeout(), "10000")); + wQOS.setText(Const.NVL(producerMeta.getQoS(), "0")); + wStepname.selectAll(); + } + + private void cancel() { + stepname = null; + producerMeta.setChanged(changed); + dispose(); + } + + /** + * Copy information from the dialog fields to the meta-data input + */ + private void setData(MQTTProducerMeta producerMeta) { + producerMeta.setBroker(wBroker.getText()); + producerMeta.setTopic(wTopicName.getText()); + producerMeta.setField(wInputField.getText()); + producerMeta.setClientId(wClientID.getText()); + producerMeta.setTimeout(wTimeout.getText()); + producerMeta.setQoS(wQOS.getText()); + producerMeta.setChanged(); + } + + private void ok() { + if (Const.isEmpty(wStepname.getText())) { + return; + } + setData(producerMeta); + stepname = wStepname.getText(); + dispose(); + } +} diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerMeta.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerMeta.java new file mode 100644 index 0000000..e5ea289 --- /dev/null +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerMeta.java @@ -0,0 +1,268 @@ +package com.ruckuswireless.pentaho.mqtt.producer; + +import java.util.List; +import java.util.Map; + +import org.pentaho.di.core.CheckResult; +import org.pentaho.di.core.CheckResultInterface; +import org.pentaho.di.core.Counter; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.exception.KettleXMLException; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.repository.ObjectId; +import org.pentaho.di.repository.Repository; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.BaseStepMeta; +import org.pentaho.di.trans.step.StepDataInterface; +import org.pentaho.di.trans.step.StepInterface; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaInterface; +import org.w3c.dom.Node; + +/** + * MQTT Client step definitions and serializer to/from XML and to/from Kettle + * repository. + * + * @author Michael Spector + */ +public class MQTTProducerMeta extends BaseStepMeta implements StepMetaInterface { + + private String broker; + private String topic; + private String field; + private String clientId; + private String timeout = "10000"; + private String qos = "0"; + + /** + * @return Broker URL + */ + public String getBroker() { + return broker; + } + + /** + * @param broker + * Broker URL + */ + public void setBroker(String broker) { + this.broker = broker; + } + + /** + * @return MQTT topic name + */ + public String getTopic() { + return topic; + } + + /** + * @param topic + * MQTT topic name + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * @return Target field name in Kettle stream + */ + public String getField() { + return field; + } + + /** + * @param field + * Target field name in Kettle stream + */ + public void setField(String field) { + this.field = field; + } + + /** + * @return Client ID + */ + public String getClientId() { + return clientId; + } + + /** + * @param clientId + * Client ID + */ + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * @return Connection timeout + */ + public String getTimeout() { + return timeout; + } + + /** + * @param timeout + * Connection timeout + */ + public void setTimeout(String timeout) { + this.timeout = timeout; + } + + /** + * @return QoS to use + */ + public String getQoS() { + return qos; + } + + /** + * @param qos + * QoS to use + */ + public void setQoS(String qos) { + this.qos = qos; + } + + public void check(List remarks, TransMeta transMeta, + StepMeta stepMeta, RowMetaInterface prev, String input[], + String output[], RowMetaInterface info) { + + if (broker == null) { + remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidBroker"), + stepMeta)); + } + if (topic == null) { + remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidTopic"), + stepMeta)); + } + if (field == null) { + remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidField"), + stepMeta)); + } + if (clientId == null) { + remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidClientID"), + stepMeta)); + } + if (timeout == null) { + remarks.add(new CheckResult( + CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidConnectionTimeout"), + stepMeta)); + } + if (qos == null) { + remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, + Messages.getString("MQTTClientMeta.Check.InvalidQOS"), + stepMeta)); + } + } + + public StepInterface getStep(StepMeta stepMeta, + StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, + Trans trans) { + return new MQTTProducerStep(stepMeta, stepDataInterface, cnr, transMeta, + trans); + } + + public StepDataInterface getStepData() { + return new MQTTProducerData(); + } + + public void loadXML(Node stepnode, List databases, + Map counters) throws KettleXMLException { + + try { + broker = XMLHandler.getTagValue(stepnode, "BROKER"); + topic = XMLHandler.getTagValue(stepnode, "TOPIC"); + field = XMLHandler.getTagValue(stepnode, "FIELD"); + clientId = XMLHandler.getTagValue(stepnode, "CLIENT_ID"); + timeout = XMLHandler.getTagValue(stepnode, "TIMEOUT"); + qos = XMLHandler.getTagValue(stepnode, "QOS"); + } catch (Exception e) { + throw new KettleXMLException( + Messages.getString("MQTTClientMeta.Exception.loadXml"), e); + } + } + + public String getXML() throws KettleException { + StringBuilder retval = new StringBuilder(); + if (broker != null) { + retval.append(" ").append( + XMLHandler.addTagValue("BROKER", broker)); + } + if (topic != null) { + retval.append(" ") + .append(XMLHandler.addTagValue("TOPIC", topic)); + } + if (field != null) { + retval.append(" ") + .append(XMLHandler.addTagValue("FIELD", field)); + } + if (clientId != null) { + retval.append(" ").append( + XMLHandler.addTagValue("CLIENT_ID", clientId)); + } + if (timeout != null) { + retval.append(" ").append( + XMLHandler.addTagValue("TIMEOUT", timeout)); + } + if (qos != null) { + retval.append(" ").append(XMLHandler.addTagValue("QOS", qos)); + } + return retval.toString(); + } + + public void readRep(Repository rep, ObjectId stepId, + List databases, Map counters) + throws KettleException { + try { + broker = rep.getStepAttributeString(stepId, "BROKER"); + topic = rep.getStepAttributeString(stepId, "TOPIC"); + field = rep.getStepAttributeString(stepId, "FIELD"); + clientId = rep.getStepAttributeString(stepId, "CLIENT_ID"); + timeout = rep.getStepAttributeString(stepId, "TIMEOUT"); + qos = rep.getStepAttributeString(stepId, "QOS"); + } catch (Exception e) { + throw new KettleException("MQTTClientMeta.Exception.loadRep", e); + } + } + + public void saveRep(Repository rep, ObjectId transformationId, + ObjectId stepId) throws KettleException { + try { + if (broker != null) { + rep.saveStepAttribute(transformationId, stepId, "BROKER", + broker); + } + if (topic != null) { + rep.saveStepAttribute(transformationId, stepId, "TOPIC", topic); + } + if (field != null) { + rep.saveStepAttribute(transformationId, stepId, "FIELD", field); + } + if (clientId != null) { + rep.saveStepAttribute(transformationId, stepId, "CLIENT_ID", + clientId); + } + if (timeout != null) { + rep.saveStepAttribute(transformationId, stepId, "TIMEOUT", + timeout); + } + if (qos != null) { + rep.saveStepAttribute(transformationId, stepId, "QOS", qos); + } + } catch (Exception e) { + throw new KettleException("MQTTClientMeta.Exception.saveRep", e); + } + } + + public void setDefault() { + } +} diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java new file mode 100644 index 0000000..24c0c8e --- /dev/null +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/MQTTProducerStep.java @@ -0,0 +1,198 @@ +package com.ruckuswireless.pentaho.mqtt.producer; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.BaseStep; +import org.pentaho.di.trans.step.StepDataInterface; +import org.pentaho.di.trans.step.StepInterface; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaInterface; + +/** + * MQTT client step processor + * + * @author Michael Spector + */ +public class MQTTProducerStep extends BaseStep implements StepInterface { + + public MQTTProducerStep(StepMeta stepMeta, + StepDataInterface stepDataInterface, int copyNr, + TransMeta transMeta, Trans trans) { + super(stepMeta, stepDataInterface, copyNr, transMeta, trans); + } + + public boolean init(StepMetaInterface smi, StepDataInterface sdi) { + super.init(smi, sdi); + + MQTTProducerMeta meta = (MQTTProducerMeta) smi; + MQTTProducerData data = (MQTTProducerData) sdi; + + String broker = environmentSubstitute(meta.getBroker()); + try { + data.client = new MqttClient(broker, + environmentSubstitute(meta.getClientId())); + + MqttConnectOptions connectOptions = new MqttConnectOptions(); + connectOptions.setCleanSession(true); + + String timeout = environmentSubstitute(meta.getTimeout()); + try { + connectOptions.setConnectionTimeout(Integer.parseInt(timeout)); + } catch (NumberFormatException e) { + logError(Messages.getString( + "MQTTClientStep.WrongTimeoutValue.Message", timeout), e); + return false; + } + + logBasic(Messages.getString( + "MQTTClientStep.CreateMQTTClient.Message", broker)); + data.client.connect(connectOptions); + + } catch (MqttException e) { + logError(Messages.getString( + "MQTTClientStep.ErrorCreateMQTTClient.Message", broker), e); + return false; + } + return true; + } + + public void dispose(StepMetaInterface smi, StepDataInterface sdi) { + MQTTProducerData data = (MQTTProducerData) sdi; + if (data.client != null) { + try { + if (data.client.isConnected()) { + data.client.disconnect(); + } + data.client.close(); + data.client = null; + } catch (MqttException e) { + logError( + Messages.getString("MQTTClientStep.ErrorClosingMQTTClient.Message"), + e); + } + } + super.dispose(smi, sdi); + } + + public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) + throws KettleException { + Object[] r = getRow(); + if (r == null) { + setOutputDone(); + return false; + } + + MQTTProducerMeta meta = (MQTTProducerMeta) smi; + MQTTProducerData data = (MQTTProducerData) sdi; + + RowMetaInterface inputRowMeta = getInputRowMeta(); + + if (first) { + first = false; + data.outputRowMeta = getInputRowMeta().clone(); + meta.getFields(data.outputRowMeta, getStepname(), null, null, this); + + String inputField = environmentSubstitute(meta.getField()); + + int numErrors = 0; + if (Const.isEmpty(inputField)) { + logError(Messages + .getString("MQTTClientStep.Log.FieldNameIsNull")); //$NON-NLS-1$ + numErrors++; + } + data.inputFieldNr = inputRowMeta.indexOfValue(inputField); + if (data.inputFieldNr < 0) { + logError(Messages.getString( + "MQTTClientStep.Log.CouldntFindField", inputField)); //$NON-NLS-1$ + numErrors++; + } + if (!inputRowMeta.getValueMeta(data.inputFieldNr).isBinary()) { + logError(Messages.getString( + "MQTTClientStep.Log.FieldNotValid", inputField)); //$NON-NLS-1$ + numErrors++; + } + if (numErrors > 0) { + setErrors(numErrors); + stopAll(); + return false; + } + data.inputFieldMeta = inputRowMeta.getValueMeta(data.inputFieldNr); + } + + try { + byte[] message = data.inputFieldMeta + .getBinary(r[data.inputFieldNr]); + String topic = environmentSubstitute(meta.getTopic()); + + String qosValue = environmentSubstitute(meta.getQoS()); + int qos; + try { + qos = Integer.parseInt(qosValue); + if (qos != 0 && qos != 1 && qos != 2) { + throw new KettleException(Messages.getString( + "MQTTClientStep.WrongQOSValue.Message", qosValue)); + } + } catch (NumberFormatException e) { + throw new KettleException(Messages.getString( + "MQTTClientStep.WrongQOSValue.Message", qosValue), e); + } + + MqttMessage mqttMessage = new MqttMessage(message); + mqttMessage.setQos(qos); + + if (isRowLevel()) { + logRowlevel(Messages.getString( + "MQTTClientStep.Log.SendingData", topic, + data.inputFieldMeta.getString(r[data.inputFieldNr]))); + } + try { + data.client.publish(topic, mqttMessage); + } catch (MqttPersistenceException e) { + throw new KettleException( + Messages.getString("MQTTClientStep.ErrorPublishing.Message"), + e); + } catch (MqttException e) { + throw new KettleException( + Messages.getString("MQTTClientStep.ErrorPublishing.Message"), + e); + } + } catch (KettleException e) { + if (!getStepMeta().isDoingErrorHandling()) { + logError(Messages.getString( + "MQTTClientStep.ErrorInStepRunning", e.getMessage())); + setErrors(1); + stopAll(); + setOutputDone(); + return false; + } + putError(getInputRowMeta(), r, 1, e.toString(), null, getStepname()); + } + return true; + } + + public void stopRunning(StepMetaInterface smi, StepDataInterface sdi) + throws KettleException { + + MQTTProducerData data = (MQTTProducerData) sdi; + try { + if (data.client.isConnected()) { + data.client.disconnect(); + } + data.client.close(); + data.client = null; + } catch (MqttException e) { + logError( + Messages.getString("MQTTClientStep.ErrorClosingMQTTClient.Message"), + e); + } + super.stopRunning(smi, sdi); + } +} diff --git a/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/Messages.java b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/Messages.java new file mode 100644 index 0000000..0fe8cc4 --- /dev/null +++ b/src/main/java/com/ruckuswireless/pentaho/mqtt/producer/Messages.java @@ -0,0 +1,36 @@ +package com.ruckuswireless.pentaho.mqtt.producer; + +import org.pentaho.di.i18n.BaseMessages; + +public class Messages { + public static final Class clazz = Messages.class; + + public static String getString(String key) { + return BaseMessages.getString(clazz, key); + } + + public static String getString(String key, String param1) { + return BaseMessages.getString(clazz, key, param1); + } + + public static String getString(String key, String param1, String param2) { + return BaseMessages.getString(clazz, key, param1, param2); + } + + public static String getString(String key, String param1, String param2, String param3) { + return BaseMessages.getString(clazz, key, param1, param2, param3); + } + + public static String getString(String key, String param1, String param2, String param3, String param4) { + return BaseMessages.getString(clazz, key, param1, param2, param3, param4); + } + + public static String getString(String key, String param1, String param2, String param3, String param4, String param5) { + return BaseMessages.getString(clazz, key, param1, param2, param3, param4, param5); + } + + public static String getString(String key, String param1, String param2, String param3, String param4, + String param5, String param6) { + return BaseMessages.getString(clazz, key, param1, param2, param3, param4, param5, param6); + } +} \ No newline at end of file diff --git a/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties b/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties new file mode 100644 index 0000000..2787969 --- /dev/null +++ b/src/main/resources/com/ruckuswireless/pentaho/mqtt/producer/messages/messages_en_US.properties @@ -0,0 +1,29 @@ +MQTTClientStep.CreateMQTTClient.Message=Connecting to MQTT broker\: {0} +MQTTClientStep.WrongTimeoutValue.Message=Wrong connection timeout value\: {0}! +MQTTClientStep.WrongQOSValue.Message=Wrong QoS value\: {0}! +MQTTClientStep.ErrorCreateMQTTClient.Message=Error connecting to MQTT broker! +MQTTClientStep.ErrorClosingMQTTClient.Message=Error closing MQTT connection! +MQTTClientStep.ErrorPublishing.Message=Error publishing MQTT message! +MQTTClientStep.Log.SendingData=Sending data through MQTT topic ''{0}'' : {1} +MQTTClientStep.ErrorInStepRunning=Error running step \: {0} +MQTTClientStep.Log.FieldNameIsNull=Input field name not specified\! +MQTTClientStep.Log.CouldntFindField=Couldn''t find field ''{0}'' in input stream\! +MQTTClientStep.Log.FieldNotValid=Input field ''{0}'' is not binary\! +MQTTClientMeta.Exception.loadXml=Unable to read step information from XML +MQTTClientMeta.Exception.loadRep=Unexpected error reading step information from the repository +MQTTClientMeta.Exception.saveRep=Unexpected error writing step information to the repository +MQTTClientMeta.Check.InvalidBroker=Broker URL must be set\! +MQTTClientMeta.Check.InvalidTopic=Topic name must be set\! +MQTTClientMeta.Check.InvalidField=Field name must be set\! +MQTTClientMeta.Check.InvalidClientID=Client ID must be set\! +MQTTClientMeta.Check.InvalidConnectionTimeout=Connection timeout must be set\! +MQTTClientMeta.Check.InvalidQOS=QoS must be set\! +MQTTClientDialog.Shell.Title=MQTT Client +MQTTClientDialog.StepName.Label=Step name +MQTTClientDialog.Broker.Label=Broker URL +MQTTClientDialog.TopicName.Label=Topic name +MQTTClientDialog.FieldName.Label=Input field name +MQTTClientDialog.ClientID.Label=Client ID +MQTTClientDialog.ConnectionTimeout.Label=Connection timeout +MQTTClientDialog.QOS.Label=QoS +MQTTClientDialog.ErrorDialog.UnableToGetInputFields=Unable to get input fields for this step\! \ No newline at end of file