Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolaceIO: separate auth and session settings #32406

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).

## New Features / Improvements

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
implementation library.java.proto_google_cloud_secret_manager_v1
implementation library.java.protobuf_java
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.vendored_grpc_1_60_1
implementation project(":sdks:java:extensions:avro")
implementation library.java.avro
permitUnusedDeclared library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,28 @@
import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper;
import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
import org.apache.beam.sdk.io.solace.write.SolaceOutput;
import org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter;
import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -195,7 +208,7 @@
*
* <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication.
* The connector provides implementation of the {@link SessionServiceFactory} using the Basic
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory}.
*
* <p>For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)})
* the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
Expand Down Expand Up @@ -620,9 +633,8 @@ public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
* <li>create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}.
* </ul>
*
* <p>An existing implementation of the SempClientFactory includes {@link
* org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic
* Authentication to Solace. *
* <p>The {@link BasicAuthJcsmpSessionServiceFactory} is an existing implementation of the
* {@link SessionServiceFactory} which implements the Basic Authentication to Solace.
*
* <p>To use it, specify the credentials with the builder methods. *
*
Expand Down Expand Up @@ -805,7 +817,8 @@ private Queue initializeQueueForTopicIfNeeded(

public enum SubmissionMode {
HIGHER_THROUGHPUT,
LOWER_LATENCY
LOWER_LATENCY,
TESTING // Send acks 1 by 1, this will be very slow, never use this in an actual pipeline!
}

public enum WriterType {
Expand All @@ -816,6 +829,8 @@ public enum WriterType {
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, SolaceOutput> {

private static final Logger LOG = LoggerFactory.getLogger(Write.class);

public static final TupleTag<Solace.PublishResult> FAILED_PUBLISH_TAG =
new TupleTag<Solace.PublishResult>() {};
public static final TupleTag<Solace.PublishResult> SUCCESSFUL_PUBLISH_TAG =
Expand Down Expand Up @@ -961,6 +976,21 @@ public Write<T> withWriterType(WriterType writerType) {
return toBuilder().setWriterType(writerType).build();
}

/**
* Set the format function for your custom data type, and/or for dynamic destinations.
*
* <p>If you are using a custom data class, this function should return a {@link Solace.Record}
* corresponding to your custom data class instance.
*
* <p>If you are using this formatting function with dynamic destinations, you must ensure that
* you set the right value in the destination value of the {@link Solace.Record} messages.
*
* <p>In any other case, this format function is optional.
*/
public Write<T> withFormatFunction(SerializableFunction<T, Solace.Record> formatFunction) {
return toBuilder().setFormatFunction(formatFunction).build();
}

/**
* Set the provider used to obtain the properties to initialize a new session in the broker.
*
Expand Down Expand Up @@ -1026,8 +1056,180 @@ abstract static class Builder<T> {

@Override
public SolaceOutput expand(PCollection<T> input) {
// TODO: will be sent in upcoming PR
return SolaceOutput.in(input.getPipeline(), null, null);
Class<? super T> pcollClass = checkNotNull(input.getTypeDescriptor()).getRawType();
boolean usingSolaceRecord =
pcollClass
.getTypeName()
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
|| pcollClass.isAssignableFrom(Solace.Record.class);

validateWriteTransform(usingSolaceRecord);

boolean usingDynamicDestinations = getDestination() == null;
SerializableFunction<Solace.Record, Destination> destinationFn;
if (usingDynamicDestinations) {
destinationFn = x -> SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
} else {
// Constant destination for all messages (same topic or queue)
// This should not be non-null, as nulls would have been flagged by the
// validateWriteTransform method
destinationFn = x -> checkNotNull(getDestination());
}

@SuppressWarnings("unchecked")
PCollection<Solace.Record> records =
getFormatFunction() == null
? (PCollection<Solace.Record>) input
: input.apply(
"Format records",
MapElements.into(TypeDescriptor.of(Solace.Record.class))
.via(checkNotNull(getFormatFunction())));

// Store the current window used by the input
PCollection<Solace.PublishResult> captureWindow =
records.apply(
"Capture window", ParDo.of(new UnboundedSolaceWriter.RecordToPublishResultDoFn()));

@SuppressWarnings("unchecked")
WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy =
(WindowingStrategy<Solace.PublishResult, BoundedWindow>)
captureWindow.getWindowingStrategy();

PCollection<Solace.Record> withGlobalWindow =
records.apply("Global window", Window.into(new GlobalWindows()));

PCollection<KV<Integer, Solace.Record>> withShardKeys =
withGlobalWindow.apply(
"Add shard key",
ParDo.of(new UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));

String label =
getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)";

PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn));

SolaceOutput output;
if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
PCollection<Solace.PublishResult> failedPublish = solaceOutput.get(FAILED_PUBLISH_TAG);
PCollection<Solace.PublishResult> successfulPublish =
solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
output =
rewindow(
SolaceOutput.in(input.getPipeline(), failedPublish, successfulPublish),
windowingStrategy);
} else {
LOG.info(
String.format(
"Solace.Write: omitting writer output because delivery mode is %s",
getDeliveryMode()));
output = SolaceOutput.in(input.getPipeline(), null, null);
}

return output;
}

private ParDo.MultiOutput<KV<Integer, Solace.Record>, Solace.PublishResult> getWriterTransform(
SerializableFunction<Solace.Record, Destination> destinationFn) {

ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult> writer =
ParDo.of(
getWriterType() == WriterType.STREAMING
? new UnboundedStreamingSolaceWriter.WriterDoFn(
destinationFn,
checkNotNull(getSessionServiceFactory()),
getDeliveryMode(),
getDispatchMode(),
getNumberOfClientsPerWorker(),
getPublishLatencyMetrics())
: new UnboundedBatchedSolaceWriter.WriterDoFn(
destinationFn,
checkNotNull(getSessionServiceFactory()),
getDeliveryMode(),
getDispatchMode(),
getNumberOfClientsPerWorker(),
getPublishLatencyMetrics()));

return writer.withOutputTags(FAILED_PUBLISH_TAG, TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
}

private SolaceOutput rewindow(
SolaceOutput solacePublishResult,
WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
PCollection<Solace.PublishResult> correct = solacePublishResult.getSuccessfulPublish();
PCollection<Solace.PublishResult> failed = solacePublishResult.getFailedPublish();

PCollection<Solace.PublishResult> correctWithWindow = null;
PCollection<Solace.PublishResult> failedWithWindow = null;

if (correct != null) {
correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow correct");
}

if (failed != null) {
failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow failed");
}

return SolaceOutput.in(
solacePublishResult.getPipeline(), failedWithWindow, correctWithWindow);
}

private static PCollection<Solace.PublishResult> applyOriginalWindow(
PCollection<Solace.PublishResult> pcoll,
WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
String label) {
Window<Solace.PublishResult> originalWindow = captureWindowDetails(strategy);

if (strategy.getMode() == WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
originalWindow = originalWindow.accumulatingFiredPanes();
} else {
originalWindow = originalWindow.discardingFiredPanes();
}

return pcoll.apply(label, originalWindow);
}

private static Window<Solace.PublishResult> captureWindowDetails(
WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
return Window.<Solace.PublishResult>into(strategy.getWindowFn())
.withAllowedLateness(strategy.getAllowedLateness())
.withOnTimeBehavior(strategy.getOnTimeBehavior())
.withTimestampCombiner(strategy.getTimestampCombiner())
.triggering(strategy.getTrigger());
}

/**
* Called before running the Pipeline to verify this transform is fully and correctly specified.
*/
private void validateWriteTransform(boolean usingSolaceRecords) {
if (!usingSolaceRecords) {
Preconditions.checkArgument(
getFormatFunction() != null,
"SolaceIO.Write: If you are not using Solace.Record as the input type, you"
+ " must set a format function using withFormatFunction().");
}

Preconditions.checkArgument(
getMaxNumOfUsedWorkers() > 0,
"SolaceIO.Write: The number of used workers must be positive.");
Preconditions.checkArgument(
getNumberOfClientsPerWorker() > 0,
"SolaceIO.Write: The number of clients per worker must be positive.");
Preconditions.checkArgument(
getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == DeliveryMode.PERSISTENT,
String.format(
"SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s"
+ " not supported",
getDeliveryMode()));
if (getPublishLatencyMetrics()) {
Preconditions.checkArgument(
getDeliveryMode() == DeliveryMode.PERSISTENT,
"SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT"
+ " delivery mode.");
}
Preconditions.checkArgument(
getSessionServiceFactory() != null,
"SolaceIO: You need to pass a session service factory. For basic"
+ " authentication, you can use BasicAuthJcsmpSessionServiceFactory.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.JCSMPProperties;

/**
* A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
* A factory for creating {@link JcsmpSessionService} instances. Extends {@link
* SessionServiceFactory}.
*
* <p>This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
* authenticate to Solace with Basic Authentication.
* <p>This factory provides a way to create {@link JcsmpSessionService} that use Basic
* Authentication.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
Expand Down Expand Up @@ -65,11 +65,14 @@ public abstract static class Builder {

@Override
public SessionService create() {
return new BasicAuthJcsmpSessionService(
checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
host(),
username(),
password(),
vpnName());
JCSMPProperties jcsmpProperties = new JCSMPProperties();
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, vpnName());
jcsmpProperties.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, username());
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, password());
jcsmpProperties.setProperty(JCSMPProperties.HOST, host());

return JcsmpSessionService.create(jcsmpProperties, getQueue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
*/
package org.apache.beam.sdk.io.solace.broker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.HttpRequestFactory;
import com.solacesystems.jcsmp.JCSMPFactory;
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,8 +36,6 @@
@Internal
public class BasicAuthSempClient implements SempClient {
private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class);
private final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;

Expand All @@ -58,13 +52,12 @@ public BasicAuthSempClient(

@Override
public boolean isQueueNonExclusive(String queueName) throws IOException {
LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName);
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().accessType().equals("non-exclusive");
boolean queueNonExclusive = sempBasicAuthClientExecutor.isQueueNonExclusive(queueName);
LOG.info(
"SolaceIO.Read: SempOperations: queried SEMP if queue {} is non-exclusive: {}",
queueName,
queueNonExclusive);
return queueNonExclusive;
}

@Override
Expand All @@ -77,12 +70,7 @@ public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, Strin

@Override
public long getBacklogBytes(String queueName) throws IOException {
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().msgSpoolUsage();
return sempBasicAuthClientExecutor.getBacklogBytes(queueName);
}

private void createQueue(String queueName) throws IOException {
Expand All @@ -94,9 +82,4 @@ private void createSubscription(String queueName, String topicName) throws IOExc
LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName);
sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName);
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
throws JsonProcessingException {
return objectMapper.readValue(content, mapSuccessToClass);
}
}
Loading
Loading