Skip to content

Commit

Permalink
Provide a session listener callback interface (#3460)
Browse files Browse the repository at this point in the history
Fixes #3457
  • Loading branch information
niloc132 authored and stanbrub committed Feb 27, 2023
1 parent c9a5e01 commit 5fa8203
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.deephaven.server.session;

import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;

import java.util.Collection;

/**
* Utility class to fan out session events to multiple handlers.
*/
public class DelegatingSessionListener implements SessionListener {
private static final Logger log = LoggerFactory.getLogger(DelegatingSessionListener.class);
private final Collection<SessionListener> sessionListeners;

public DelegatingSessionListener(Collection<SessionListener> sessionListeners) {
this.sessionListeners = sessionListeners;
}

@Override
public void onSessionCreate(SessionState session) {
for (SessionListener listener : sessionListeners) {
try {
listener.onSessionCreate(session);
} catch (Exception e) {
log.error().append("Error invoking session listener ").append(listener.getClass().getName()).append(e)
.endl();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.deephaven.server.session;

/**
* Callbacks for the SessionService, to observe session lifecycles.
*/
public interface SessionListener {
/**
* When a new session is created and has been given a refresh token, this will be invoked.
* <p>
* </p>
* To track a session closing, use {@link SessionState#addOnCloseCallback(java.io.Closeable)}.
*
* @param session the newly created session
*/
void onSessionCreate(SessionState session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import dagger.multibindings.ElementsIntoSet;
import dagger.multibindings.IntoSet;
import io.deephaven.server.auth.AuthorizationProvider;
import io.deephaven.server.util.AuthorizationWrappedGrpcBinding;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;

import java.util.Collections;
import java.util.Set;

@Module
public interface SessionModule {
@Provides
Expand All @@ -30,4 +34,10 @@ ServerInterceptor bindSessionServiceInterceptor(
@Binds
@IntoSet
TicketResolver bindSessionTicketResolverServerSideExports(ExportTicketResolver resolver);

@Provides
@ElementsIntoSet
static Set<SessionListener> primeSessionListeners() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -62,10 +63,13 @@ public class SessionService {

private final Map<String, AuthenticationRequestHandler> authRequestHandlers;

private final SessionListener sessionListener;

@Inject
public SessionService(final Scheduler scheduler, final SessionState.Factory sessionFactory,
@Named("session.tokenExpireMs") final long tokenExpireMs,
Map<String, AuthenticationRequestHandler> authRequestHandlers) {
Map<String, AuthenticationRequestHandler> authRequestHandlers,
Set<SessionListener> sessionListeners) {
this.scheduler = scheduler;
this.sessionFactory = sessionFactory;
this.tokenExpireMs = tokenExpireMs;
Expand All @@ -87,6 +91,8 @@ public SessionService(final Scheduler scheduler, final SessionState.Factory sess
if (ProcessEnvironment.tryGet() != null) {
ProcessEnvironment.getGlobalFatalErrorReporter().addInterceptor(this::onFatalError);
}

this.sessionListener = new DelegatingSessionListener(sessionListeners);
}

private synchronized void onFatalError(
Expand Down Expand Up @@ -152,7 +158,8 @@ public void addTerminationListener(
*/
public SessionState newSession(final AuthContext authContext) {
final SessionState session = sessionFactory.create(authContext);
refreshToken(session, true);
checkTokenAndRotate(session, true);
sessionListener.onSessionCreate(session);
return session;
}

Expand All @@ -163,11 +170,11 @@ public SessionState newSession(final AuthContext authContext) {
* @return the most recent token expiration
*/
public TokenExpiration refreshToken(final SessionState session) {
return refreshToken(session, false);
return checkTokenAndRotate(session, false);
}

@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private TokenExpiration refreshToken(final SessionState session, boolean initialToken) {
private TokenExpiration checkTokenAndRotate(final SessionState session, boolean initialToken) {
UUID newUUID;
TokenExpiration expiration;
final long nowMillis = scheduler.currentTimeMillis();
Expand Down Expand Up @@ -352,11 +359,8 @@ public void run() {
// token expiration time.
outstandingCookies.poll();

synchronized (next.session) {
final TokenExpiration tokenExpiration = next.session.getExpiration();
if (tokenExpiration != null && tokenExpiration.deadlineMillis <= nowMillis) {
next.session.onExpired();
}
if (next.session.isExpired()) {
next.session.onExpired();
}
} while (true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ public void onExpired() {
log.info().append(logPrefix).append("releasing outstanding exports").endl();
synchronized (exportMap) {
exportMap.forEach(ExportObject::cancel);
exportMap.clear();
}
exportMap.clear();

log.info().append(logPrefix).append("outstanding exports released").endl();
synchronized (exportListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void setup() {
scheduler = new TestControlledScheduler();
sessionService = new SessionService(scheduler,
authContext -> new SessionState(scheduler, TestExecutionContext::createForUnitTests, authContext),
TOKEN_EXPIRE_MS, Collections.emptyMap());
TOKEN_EXPIRE_MS, Collections.emptyMap(), Collections.emptySet());
applicationServiceGrpcImpl = new ApplicationServiceGrpcImpl(scheduler, sessionService,
new TypeLookup(ObjectTypeLookup.NoOp.INSTANCE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.junit.Test;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class SessionServiceTest {

Expand All @@ -23,14 +26,21 @@ public class SessionServiceTest {
private SafeCloseable livenessScope;
private TestControlledScheduler scheduler;
private SessionService sessionService;
private Consumer<SessionState> sessionStateCallable;

@Before
public void setup() {
livenessScope = LivenessScopeStack.open();
scheduler = new TestControlledScheduler();
sessionService = new SessionService(scheduler,
authContext -> new SessionState(scheduler, TestExecutionContext::createForUnitTests, authContext),
TOKEN_EXPIRE_MS, Collections.emptyMap());
TOKEN_EXPIRE_MS, Collections.emptyMap(), Collections.singleton(this::sessionCreatedCallback));
}

private void sessionCreatedCallback(SessionState sessionState) {
if (sessionStateCallable != null) {
sessionStateCallable.accept(sessionState);
}
}

@After
Expand All @@ -42,6 +52,22 @@ public void teardown() {
livenessScope = null;
}

@Test
public void testSessionCreationCallback() {
AtomicReference<SessionState> sessionReference = new AtomicReference<>(null);
AtomicInteger count = new AtomicInteger(0);

sessionStateCallable = newValue -> {
sessionReference.set(newValue);
count.incrementAndGet();
};

final SessionState session = sessionService.newSession(AUTH_CONTEXT);

Assert.eq(sessionReference.get(), "sessionReference.get()", session, "session");
Assert.eq(count.get(), "count.get()", 1);
}

@Test
public void testSessionExpiresOnClose() {
final SessionState session;
Expand Down

0 comments on commit 5fa8203

Please sign in to comment.