Skip to content

Commit

Permalink
use standard connection create inbox for discovery and also allow the…
Browse files Browse the repository at this point in the history
… user to provide their own supplier. (#857)
  • Loading branch information
scottf authored Mar 14, 2023
1 parent b2ff1bc commit 21b4fdc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 35 deletions.
16 changes: 9 additions & 7 deletions src/main/java/io/nats/service/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static io.nats.client.NUID.nextGlobal;
import static io.nats.service.Service.*;

/**
Expand All @@ -36,6 +36,8 @@ public class Discovery {
private final long maxTimeMillis;
private final int maxResults;

private Supplier<String> inboxSupplier;

public Discovery(Connection conn) {
this(conn, -1, -1);
}
Expand All @@ -44,6 +46,11 @@ public Discovery(Connection conn, long maxTimeMillis, int maxResults) {
this.conn = conn;
this.maxTimeMillis = maxTimeMillis < 1 ? DEFAULT_DISCOVERY_MAX_TIME_MILLIS : maxTimeMillis;
this.maxResults = maxResults < 1 ? DEFAULT_DISCOVERY_MAX_RESULTS : maxResults;
setInboxSupplier(null);
}

public void setInboxSupplier(Supplier<String> inboxSupplier) {
this.inboxSupplier = inboxSupplier == null ? conn::createInbox : inboxSupplier;
}

// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -138,12 +145,7 @@ private byte[] discoverOne(String action, String serviceName, String serviceId)
private void discoverMany(String action, String serviceName, Consumer<byte[]> dataConsumer) {
Subscription sub = null;
try {
StringBuilder sb = new StringBuilder(nextGlobal()).append('-').append(action);
if (serviceName != null) {
sb.append('-').append(serviceName);
}
String replyTo = sb.toString();

String replyTo = inboxSupplier.get();
sub = conn.subscribe(replyTo);

String subject = toDiscoverySubject(action, serviceName, null);
Expand Down
89 changes: 61 additions & 28 deletions src/test/java/io/nats/service/ServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public class ServiceTests extends JetStreamTestBase {

@Test
public void testServiceWorkflow() throws Exception {
try (NatsTestServer ts = new NatsTestServer())
{
try (NatsTestServer ts = new NatsTestServer()) {
try (Connection serviceNc1 = standardConnection(ts.getURI());
Connection serviceNc2 = standardConnection(ts.getURI());
Connection clientNc = standardConnection(ts.getURI())) {
Expand Down Expand Up @@ -166,12 +165,12 @@ public void testServiceWorkflow() throws Exception {
SchemaResponse schemaResponse2 = service2.getSchemaResponse();
StatsResponse statsResponse1 = service1.getStatsResponse();
StatsResponse statsResponse2 = service2.getStatsResponse();
EndpointResponse[] endpointResponseArray1 = new EndpointResponse[] {
EndpointResponse[] endpointResponseArray1 = new EndpointResponse[]{
service1.getEndpointStats(ECHO_ENDPOINT_NAME),
service1.getEndpointStats(SORT_ENDPOINT_ASCENDING_NAME),
service1.getEndpointStats(SORT_ENDPOINT_DESCENDING_NAME)
};
EndpointResponse[] endpointResponseArray2 = new EndpointResponse[] {
EndpointResponse[] endpointResponseArray2 = new EndpointResponse[]{
service2.getEndpointStats(ECHO_ENDPOINT_NAME),
service2.getEndpointStats(SORT_ENDPOINT_ASCENDING_NAME),
service2.getEndpointStats(SORT_ENDPOINT_DESCENDING_NAME)
Expand Down Expand Up @@ -214,8 +213,8 @@ public void testServiceWorkflow() throws Exception {
// info discovery
Verifier infoVerifier = (expected, response) -> {
assertTrue(response instanceof InfoResponse);
InfoResponse exp = (InfoResponse)expected;
InfoResponse r = (InfoResponse)response;
InfoResponse exp = (InfoResponse) expected;
InfoResponse r = (InfoResponse) response;
assertEquals(exp.getDescription(), r.getDescription());
assertEquals(exp.getSubjects(), r.getSubjects());
};
Expand All @@ -229,8 +228,8 @@ public void testServiceWorkflow() throws Exception {
// schema discovery
Verifier schemaVerifier = (expected, response) -> {
assertTrue(response instanceof SchemaResponse);
SchemaResponse exp = (SchemaResponse)expected;
SchemaResponse r = (SchemaResponse)response;
SchemaResponse exp = (SchemaResponse) expected;
SchemaResponse r = (SchemaResponse) response;
assertEquals(exp.getApiUrl(), r.getApiUrl());
assertEquals(exp.getEndpoints(), r.getEndpoints());
};
Expand All @@ -244,8 +243,8 @@ public void testServiceWorkflow() throws Exception {
// stats discovery
Verifier statsVerifier = (expected, response) -> {
assertTrue(response instanceof StatsResponse);
StatsResponse exp = (StatsResponse)expected;
StatsResponse sr = (StatsResponse)response;
StatsResponse exp = (StatsResponse) expected;
StatsResponse sr = (StatsResponse) response;
assertEquals(exp.getStarted(), sr.getStarted());
for (int x = 0; x < 3; x++) {
EndpointResponse er = exp.getEndpointStats().get(x);
Expand Down Expand Up @@ -301,10 +300,10 @@ interface Verifier {

@SuppressWarnings("unchecked")
private static void verifyDiscovery(Object oResponse, Verifier v, ServiceResponse... expectedResponses) {
List<Object> responses = oResponse instanceof List ? (List<Object>)oResponse : Collections.singletonList(oResponse);
List<Object> responses = oResponse instanceof List ? (List<Object>) oResponse : Collections.singletonList(oResponse);
assertEquals(expectedResponses.length, responses.size());
for (Object response : responses) {
ServiceResponse sr = (ServiceResponse)response;
ServiceResponse sr = (ServiceResponse) response;
ServiceResponse exp = find(expectedResponses, sr);
assertNotNull(exp);
assertEquals(exp.getType(), sr.getType());
Expand Down Expand Up @@ -341,8 +340,7 @@ private static void verifyServiceExecution(Connection nc, String endpointName, S
assertEquals(sortD(request), response);
break;
}
}
catch (Exception e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -436,7 +434,8 @@ public void testDispatchers() throws Exception {

ServiceEndpoint se1 = ServiceEndpoint.builder()
.endpointName("dispatch")
.handler(m -> {})
.handler(m -> {
})
.dispatcher(dEnd)
.build();
Service service = new ServiceBuilder()
Expand Down Expand Up @@ -498,12 +497,14 @@ public void testDispatchers() throws Exception {

se1 = ServiceEndpoint.builder()
.endpointName("dispatch")
.handler(m -> {})
.handler(m -> {
})
.build();

ServiceEndpoint se2 = ServiceEndpoint.builder()
.endpointName("another")
.handler(m -> {})
.handler(m -> {
})
.build();

service = new ServiceBuilder()
Expand Down Expand Up @@ -535,7 +536,8 @@ public void testServiceBuilderConstruction() {
Connection conn = new MockNatsConnection(options);
ServiceEndpoint se = ServiceEndpoint.builder()
.endpoint(new Endpoint(name(0)))
.handler(m -> {})
.handler(m -> {
})
.build();

// minimum valid service
Expand Down Expand Up @@ -599,7 +601,9 @@ public void testHandlerException() throws Exception {
ServiceEndpoint exServiceEndpoint = ServiceEndpoint.builder()
.endpointName("exEndpoint")
.endpointSubject("exSubject")
.handler(m -> { throw new RuntimeException("handler-problem"); })
.handler(m -> {
throw new RuntimeException("handler-problem");
})
.build();

Service exService = new ServiceBuilder()
Expand Down Expand Up @@ -805,7 +809,7 @@ public void testEndpointConstruction() {
assertThrows(IllegalArgumentException.class, () -> Endpoint.builder().build());

// many names are bad
assertThrows(IllegalArgumentException.class, () -> new Endpoint((String)null));
assertThrows(IllegalArgumentException.class, () -> new Endpoint((String) null));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(EMPTY));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_PRINTABLE));
Expand Down Expand Up @@ -981,7 +985,8 @@ public void testServiceEndpointConstruction() {
Group g2 = new Group(subject(2)).appendGroup(g1);
Endpoint e1 = new Endpoint(name(100), subject(100));
Endpoint e2 = new Endpoint(name(200), subject(200));
ServiceMessageHandler smh = m -> {};
ServiceMessageHandler smh = m -> {
};
Supplier<JsonValue> sds = () -> null;

ServiceEndpoint se = ServiceEndpoint.builder()
Expand Down Expand Up @@ -1107,17 +1112,19 @@ public void testServiceResponsesConstruction() {
validateApiInOutInfoResponse(ir2);

List<EndpointResponse> endpoints = new ArrayList<>();
endpoints.add(new EndpointResponse("endName0", "endSubject0", new Schema("endSchemaRequest0", "endSchemaResponse0")));
endpoints.add(new EndpointResponse("endName1", "endSubject1", new Schema("endSchemaRequest1", "endSchemaResponse1")));
endpoints.add(new EndpointResponse("endName0", "endSubject0", new Schema("endSchemaRequest0", "endSchemaResponse0")));
endpoints.add(new EndpointResponse("endName1", "endSubject1", new Schema("endSchemaRequest1", "endSchemaResponse1")));
SchemaResponse sch1 = new SchemaResponse("id", "name", "0.0.0", "apiUrl", endpoints);
SchemaResponse sch2 = new SchemaResponse(sch1.toJson().getBytes());
validateApiInOutSchemaResponse(sch1);
validateApiInOutSchemaResponse(sch2);

ZonedDateTime serviceStarted = DateTimeUtils.gmtNow();
ZonedDateTime[] endStarteds = new ZonedDateTime[2];
sleep(100); endStarteds[0] = DateTimeUtils.gmtNow();
sleep(100); endStarteds[1] = DateTimeUtils.gmtNow();
sleep(100);
endStarteds[0] = DateTimeUtils.gmtNow();
sleep(100);
endStarteds[1] = DateTimeUtils.gmtNow();

List<EndpointResponse> statsList = new ArrayList<>();
JsonValue[] data = new JsonValue[]{supplyData(), supplyData()};
Expand Down Expand Up @@ -1200,8 +1207,8 @@ private static void validateApiInOutServiceResponse(ServiceResponse r, String ty
}

private static int _dataX = -1;
public static JsonValue supplyData()
{

public static JsonValue supplyData() {
_dataX++;
return new TestStatsData("s-" + _dataX, _dataX).toJsonValue();
}
Expand Down Expand Up @@ -1256,4 +1263,30 @@ public int hashCode() {
return result;
}
}
}

static class TestInboxSupplier implements Supplier<String> {
boolean wasCalled = false;
@Override
public String get() {
wasCalled = true;
return "CUSTOM_INBOX";
}
}

@Test
public void testInboxSupplier() throws Exception {
runInServer(nc -> {
Discovery discovery = new Discovery(nc, 100, 1);
TestInboxSupplier supplier = new TestInboxSupplier();
discovery.setInboxSupplier(supplier);
try {
discovery.ping("servicename");
}
catch (Exception e) {
// we know it will throw exception b/c there is no service
// running, we just care about it make the call
}
assertTrue(supplier.wasCalled);
});
}
}

0 comments on commit 21b4fdc

Please sign in to comment.