Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Upgrades Jackson to fix this issue.
Browse files Browse the repository at this point in the history
FasterXML/jackson-databind#656

Upgrades events to be polymorphic to allow for easy addition and mapping of events without 1 large class for all events
  • Loading branch information
Todd Nine committed Aug 4, 2015
1 parent 29d115f commit 491008e
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,31 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;

import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.queue.QueueManager;
Expand All @@ -56,7 +60,9 @@

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;

Expand Down Expand Up @@ -211,43 +217,42 @@ public void ack(final QueueMessage message) {

}

private void handleMessages(final List<QueueMessage> messages) {
if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());

for (QueueMessage message : messages) {
final AsyncEvent event = (AsyncEvent) message.getBody();

if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());

if (event == null || event.getEventType() == null) {
logger.error("AsyncEvent type or event is null!");
} else {
switch (event.getEventType()) {
private void handleMessages( final List<QueueMessage> messages ) {
if ( logger.isDebugEnabled() ) {
logger.debug( "handleMessages with {} message", messages.size() );
}

case EDGE_DELETE:
handleEdgeDelete(message);
break;
for ( QueueMessage message : messages ) {
final AsyncEvent event = ( AsyncEvent ) message.getBody();

case EDGE_INDEX:
handleEdgeIndex(message);
break;
logger.debug( "Processing {} event", event );

case ENTITY_DELETE:
handleEntityDelete(message);
break;
if ( event == null ) {
logger.error( "AsyncEvent type or event is null!" );
continue;
}

case ENTITY_INDEX:
handleEntityIndexUpdate(message);
break;

case APPLICATION_INDEX:
handleInitializeApplicationIndex(message);
break;
if ( event instanceof EdgeDeleteEvent ) {
handleEdgeDelete( message );
}
else if ( event instanceof EdgeIndexEvent ) {
handleEdgeIndex( message );
}

default:
logger.error("Unknown EventType: {}", event.getEventType());
else if ( event instanceof EntityDeleteEvent ) {
handleEntityDelete( message );
}
else if ( event instanceof EntityIndexEvent ) {
handleEntityIndexUpdate( message );
}

}
else if ( event instanceof InitializeApplicationIndexEvent ) {
handleInitializeApplicationIndex( message );
}
else {
logger.error( "Unknown EventType: {}", event );
}

messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
Expand All @@ -257,7 +262,8 @@ private void handleMessages(final List<QueueMessage> messages) {

@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}

Expand All @@ -272,19 +278,22 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,

public void handleEntityIndexUpdate(final QueueMessage message) {

Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );

final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
final AsyncEvent event = ( AsyncEvent ) message.getBody();

Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));

final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;


//process the entity immediately
//only process the same version, otherwise ignore
final EntityIdScope entityIdScope = event.getEntityIdScope();
final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
final long updatedAfter = event.getUpdatedAfter();
final long updatedAfter = entityIndexEvent.getUpdatedAfter();

final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);

Expand All @@ -310,17 +319,19 @@ public void handleEdgeIndex(final QueueMessage message) {

final AsyncEvent event = (AsyncEvent) message.getBody();

Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));

final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;

final ApplicationScope applicationScope = event.getApplicationScope();
final Edge edge = event.getEdge();
final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
final Edge edge = edgeIndexEvent.getEdge();



final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );

final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) );

subscibeAndAck( edgeIndexObservable, message );
Expand All @@ -339,11 +350,14 @@ public void handleEdgeDelete(final QueueMessage message) {

final AsyncEvent event = (AsyncEvent) message.getBody();

Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));

final ApplicationScope applicationScope = event.getApplicationScope();
final Edge edge = event.getEdge();

final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;

final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
final Edge edge = edgeIndexEvent.getEdge();

if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);

Expand All @@ -364,12 +378,14 @@ public void handleEntityDelete(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");

final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
Preconditions.checkArgument( event instanceof EntityDeleteEvent,
String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );

final ApplicationScope applicationScope = event.getApplicationScope();
final Id entityId = event.getEntityId();

final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();

if (logger.isDebugEnabled())
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
Expand All @@ -391,10 +407,13 @@ public void handleInitializeApplicationIndex(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");

final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));

final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
( InitializeApplicationIndexEvent ) event;

final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
ack( message );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,15 @@ public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOper
entity -> {
final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );

/**
* We don't have a modified field, so we can't check, pass it through
*/
if ( modified == null ) {
return true;
}

//only re-index if it has been updated and been updated after our timestamp
return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
return modified.getValue() >= entityIndexOperation.getUpdatedSince();
} )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
Expand Down
Loading

0 comments on commit 491008e

Please sign in to comment.