Skip to content

Commit

Permalink
issue #1083: does not continue in the work-stage when document could …
Browse files Browse the repository at this point in the history
…be skipped
  • Loading branch information
mrk-vi committed Oct 15, 2024
1 parent a75c7bc commit 246e9fc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ yield initPipeline(
dataPayload.getContentId()
);

var buffer = Json.encodeToBuffer(dataPayload);

replyTo.tell(new Processor.Success(buffer.getBytes(), heldMessage));
replyTo.tell(new Skip(heldMessage));

return Behaviors.stopped();
}
Expand Down Expand Up @@ -400,4 +398,6 @@ private record InternalResponseWrapper(byte[] jsonObject) implements Processor.C

private record InternalError(DataProcessException exception) implements Processor.Command {}

public record Skip(HeldMessage heldMessage) implements Processor.Response {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.openk9.common.util.ShardingKey;
import io.openk9.common.util.ingestion.PayloadType;
import io.openk9.datasource.pipeline.actor.DataProcessException;
import io.openk9.datasource.pipeline.actor.EnrichPipeline;
import io.openk9.datasource.pipeline.actor.Scheduling;
import io.openk9.datasource.pipeline.actor.WorkStageException;
import io.openk9.datasource.pipeline.actor.common.AggregateBehavior;
Expand Down Expand Up @@ -252,6 +253,11 @@ private Behavior<Command> onPostProcess(PostProcess postProcess) {
heldMessage
));

}
else if (response instanceof EnrichPipeline.Skip) {

this.replyTo.tell(new Done(heldMessage));

}
else if (response instanceof Processor.Failure failure) {

Expand Down

0 comments on commit 246e9fc

Please sign in to comment.