Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot replicate MQTT instructions to few workers #866

Open
f1faust opened this issue Jul 1, 2022 · 1 comment
Open

Cannot replicate MQTT instructions to few workers #866

f1faust opened this issue Jul 1, 2022 · 1 comment

Comments

@f1faust
Copy link

f1faust commented Jul 1, 2022

Issue Guidelines

I use MqttSourceConnector and would like to replicate shared MQTT subscription to all workers.
connect.mqtt.clean=true
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=4
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat
connect.mqtt.password=***********
connect.mqtt.share.replicate=true

As a result only 1 worker is created. To fix this issue I need update kcql string to the following:
connect.mqtt.kcql=INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat;INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat

Just duplicate kcql string with ';' separator that is not good and proper way

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

3.0.1

Have you read the docs?

yes

What is the expected behaviour?

I think if connect.mqtt.share.replicate=true the kcql query should be one like INSERT INTO SESSION_CHAT_DATA_TOPIC SELECT * FROM $share/g/chat and replicated to all workers

@LarissaVerhuelsdonk
Copy link

Replicates with v6.0.3 still.

The way @f1faust described it, there might be a bug in io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector.scala. Duplication of the KCQL statement seems required to trigger the else-path of if (maxTasks == 1 || kcql.length == 1) in line 52 (4th in snippet), since kcql.length evaluates to 2 in that case:

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
    val settings = MqttSourceSettings(MqttSourceConfig(configProps.asScala.toMap))
    val kcql     = settings.kcql
    if (maxTasks == 1 || kcql.length == 1) {
      Collections.singletonList(configProps)
    } else {
      val groups = kcql.length / maxTasks + kcql.length % maxTasks

      // If the option is enabled, copy every KCQL instruction with a shared subscription to every tasks, otherwise
      // the shared subscriptions are distributed as every other instructions.
      val (replicated, distributed) =
        if (settings.replicateShared) kcql.partition(shouldReplicate) else (Array[String](), kcql)
      val configs = Array.fill(maxTasks)(replicated)
        .zipAll(distributed.grouped(groups).toList, Array[String](), Array[String]())
        .map(z => z._2 ++ z._1)
        .filter(_.nonEmpty)
        .zipWithIndex
        .map {
          case (p, index) =>
            val map = settings.copy(kcql = p, clientId = settings.clientId + "-" + index).asMap()
            configProps.asScala
              .filterNot { case (k, _) => map.containsKey(k) }
              .foreach { case (k, v) => map.put(k, v) }
            map
        }
      configs.toList.asJava
    }
  }

Not sure I'll find the time to evaluate my suspicion or commit a fix in the short term, but felt like sharing nonetheless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants