Skip to content

Commit

Permalink
drop KeycloakSession use in send() method (#32)
Browse files Browse the repository at this point in the history
* logging and drop KeycloakSession use in send() method

* moved logging back to debug
  • Loading branch information
xgp authored Jul 20, 2023
1 parent 2ebdebe commit 29bad71
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import lombok.extern.jbosslog.JBossLog;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.keycloak.broker.provider.util.SimpleHttp;
import org.keycloak.models.KeycloakSession;
import org.keycloak.util.JsonSerialization;
Expand All @@ -36,7 +38,7 @@ public HttpSenderEventListenerProvider(KeycloakSession session, ScheduledExecuto
@Override
BackOff getBackOff() {
boolean retry = getBooleanOr(config, RETRY, true);
log.infof("Retry is %b %s", retry, getOr(config, RETRY, "[empty]"));
log.debugf("Retry is %b %s", retry, getOr(config, RETRY, "[empty]"));
if (!retry) return BackOff.STOP_BACKOFF;
else
return new ExponentialBackOff.Builder()
Expand Down Expand Up @@ -68,18 +70,25 @@ void send(SenderTask task) throws SenderException, IOException {
protected void send(
SenderTask task, String targetUri, Optional<String> sharedSecret, Optional<String> algorithm)
throws SenderException, IOException {
SimpleHttp request = SimpleHttp.doPost(targetUri, session).json(task.getEvent());
sharedSecret.ifPresent(
secret ->
request.header(
"X-Keycloak-Signature",
hmacFor(task.getEvent(), secret, algorithm.orElse(HMAC_SHA256_ALGORITHM))));
SimpleHttp.Response response = request.asResponse();
int status = response.getStatus();
log.debugf("sent to %s (%d)", targetUri, status);
if (status < HTTP_OK || status >= HTTP_MULT_CHOICE) { // any 2xx is acceptable
log.warnf("Sending failure (Server response:%d)", status);
throw new SenderException(true);
log.debugf("attempting send to %s", targetUri);
try (CloseableHttpClient http = HttpClients.createDefault()) {
// SimpleHttp request = SimpleHttp.doPost(targetUri, session).json(task.getEvent());
SimpleHttp request = SimpleHttp.doPost(targetUri, http).json(task.getEvent());
sharedSecret.ifPresent(
secret ->
request.header(
"X-Keycloak-Signature",
hmacFor(task.getEvent(), secret, algorithm.orElse(HMAC_SHA256_ALGORITHM))));
SimpleHttp.Response response = request.asResponse();
int status = response.getStatus();
log.debugf("sent to %s (%d)", targetUri, status);
if (status < HTTP_OK || status >= HTTP_MULT_CHOICE) { // any 2xx is acceptable
log.warnf("Sending failure (Server response:%d)", status);
throw new SenderException(true);
}
} catch (Exception e) {
log.warnf(e, "Sending exception to %s", targetUri);
throw new SenderException(false, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ public abstract class MultiEventListenerProviderFactory extends AbstractEventLis

@Override
public MultiEventListenerProvider create(KeycloakSession session) {
ExecutorService exec =
session.getProvider(ExecutorsProvider.class).getExecutor("multi-event-provider-threads");
List<EventListenerProvider> providers =
getConfigurations(session).stream()
.map(config -> configure(session, config))
.collect(Collectors.toList());
return new MultiEventListenerProvider(session, providers, isAsync(), exec);
try {
ExecutorService exec =
session.getProvider(ExecutorsProvider.class).getExecutor("multi-event-provider-threads");
List<EventListenerProvider> providers =
getConfigurations(session).stream()
.map(config -> configure(session, config))
.collect(Collectors.toList());
return new MultiEventListenerProvider(session, providers, isAsync(), exec);
} catch (Exception e) {
log.warn("Error configuring provider", e);
throw new IllegalStateException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void onEvent(AdminEvent event, boolean b) {
@Override
public void close() {
// close this instance of the event listener
log.debugf("called close() on SenderEventListenerProvider");
}

class SenderTask {
Expand Down Expand Up @@ -103,26 +104,30 @@ protected void schedule(SenderTask task, long delay, TimeUnit unit) {
log.warn("Task scheduled after shutdown initiated");
return;
}
exec.schedule(
() -> {
try {
send(task);
} catch (SenderException | IOException e) {
log.debug("sending exception", e);
if (e instanceof SenderException && !((SenderException) e).isRetryable()) return;
log.infof(
"BackOff policy is %s",
BackOff.STOP_BACKOFF == task.getBackOff() ? "STOP" : "BACKOFF");
long backOffTime = task.getBackOff().nextBackOffMillis();
if (backOffTime == BackOff.STOP) return;
log.infof("retrying in %d due to %s", backOffTime, e.getCause());
schedule(task, backOffTime, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.warn("Uncaught Sender error", t);
}
},
delay,
unit);
try {
exec.schedule(
() -> {
try {
send(task);
} catch (SenderException | IOException e) {
log.debug("sending exception", e);
if (e instanceof SenderException && !((SenderException) e).isRetryable()) return;
log.debugf(
"BackOff policy is %s",
BackOff.STOP_BACKOFF == task.getBackOff() ? "STOP" : "BACKOFF");
long backOffTime = task.getBackOff().nextBackOffMillis();
if (backOffTime == BackOff.STOP) return;
log.debugf("retrying in %d due to %s", backOffTime, e.getCause());
schedule(task, backOffTime, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.warn("Uncaught Sender error", t);
}
},
delay,
unit);
} catch (Exception e) {
log.warn("Error scheduling task", e);
}
}

abstract void send(SenderTask task) throws SenderException, IOException;
Expand Down

0 comments on commit 29bad71

Please sign in to comment.