Skip to content

Commit

Permalink
Solace Read connector: UnboundedSource and UnboundedReader (#31636)
Browse files Browse the repository at this point in the history
* Add interfaces for broker-helper classes

* Add UnboundedSolaceReader and UnboundedSolaceSource
  • Loading branch information
bzablocki authored Jun 18, 2024
1 parent ef88539 commit 96b9de0
Show file tree
Hide file tree
Showing 9 changed files with 523 additions and 3 deletions.
3 changes: 3 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.solace
implementation project(":sdks:java:extensions:avro")
implementation library.java.avro
permitUnusedDeclared library.java.avro
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;

/**
* Interface for receiving messages from a Solace broker.
*
* <p>Implementations of this interface are responsible for managing the connection to the broker
* and for receiving messages from the broker.
*/
public interface MessageReceiver {
/**
* Starts the message receiver.
*
* <p>This method is called in the {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method.
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
* <p>This method will block until a message is received.
*/
BytesXMLMessage receive() throws IOException;

/**
* Test clients may return {@literal true} to signal that all expected messages have been pulled
* and the test may complete. Real clients should always return {@literal false}.
*/
default boolean isEOF() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import java.io.Serializable;

/**
* This interface defines methods for interacting with a Solace message broker using the Solace
* Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of
* the broker, including queues and topics.
*/
public interface SempClient extends Serializable {

/**
* Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow
* multiple consumers to receive messages from the queue.
*/
boolean isQueueNonExclusive(String queueName) throws IOException;

/**
* This is only called when a user requests to read data from a topic. This method creates a new
* queue on the Solace broker and associates it with the specified topic. This ensures that
* messages published to the topic are delivered to the queue, allowing consumers to receive them.
*/
Queue createQueueForTopic(String queueName, String topicName) throws IOException;

/**
* Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents
* the amount of data in messages that are waiting to be delivered to consumers.
*/
long getBacklogBytes(String queueName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,13 @@
* This interface serves as a blueprint for creating SempClient objects, which are used to interact
* with a Solace message broker using the Solace Element Management Protocol (SEMP).
*/
public interface SempClientFactory extends Serializable {}
public interface SempClientFactory extends Serializable {

/**
* This method is the core of the factory interface. It defines how to construct and return a
* SempClient object. Implementations of this interface will provide the specific logic for
* creating a client instance, which might involve connecting to the broker, handling
* authentication, and configuring other settings.
*/
SempClient create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import java.io.Serializable;

/**
* The SessionService interface provides a set of methods for managing a session with the Solace
* messaging system. It allows for establishing a connection, creating a message-receiver object,
* checking if the connection is closed or not, and gracefully closing the session.
*/
public interface SessionService extends Serializable {

/**
* Establishes a connection to the service. This could involve providing connection details like
* host, port, VPN name, username, and password.
*/
void connect();

/** Gracefully closes the connection to the service. */
void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
boolean isClosed();

/**
* Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
* created from the session instance.
*/
MessageReceiver createReceiver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,11 @@
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {}
public abstract class SessionServiceFactory implements Serializable {

/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
*/
public abstract SessionService create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be
* acknowledged.
*/
@DefaultCoder(AvroCoder.class)
class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
Expand Down
Loading

0 comments on commit 96b9de0

Please sign in to comment.