diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index f363c06a4..191666ded 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -29,7 +29,7 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio SchemaStore store = new SchemaStore(connection, this.context.getServerID(), this.schema, pos); store.save(); - this.context.setInitialPosition(pos); + this.context.setPosition(pos); } private void run(String[] argv) throws Exception { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java b/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java index 8bb91a270..593fa94c6 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java @@ -102,6 +102,10 @@ public void setTXCommit() { this.txCommit = true; } + public boolean isTXCommit() { + return txCommit; + } + @Override public String toString() { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index f2e0cb9fb..75c4a0f56 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -56,11 +56,16 @@ public BinlogPosition getInitialPosition() throws FileNotFoundException, IOExcep return this.initialPosition; } - public void setInitialPosition(BinlogPosition position) throws SQLException { + public void setPosition(MaxwellAbstractRowsEvent e) throws SQLException { + if ( e.isTXCommit() ) { + this.setPosition(e.getNextBinlogPosition()); + } + } + public void setPosition(BinlogPosition position) throws SQLException { this.getSchemaPosition().set(position); } - public void setInitialPositionSync(BinlogPosition position) throws SQLException { + public void setPositionSync(BinlogPosition position) throws SQLException { this.getSchemaPosition().setSync(position); } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellParser.java b/src/main/java/com/zendesk/maxwell/MaxwellParser.java index 2a26a2c6c..f16deba43 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellParser.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellParser.java @@ -99,17 +99,19 @@ public void stop(long timeoutMS) throws TimeoutException { throw new TimeoutException("Maxwell's main parser thread didn't die after " + timeoutMS + "ms."); } + private void ensureReplicatorThread() throws Exception { + if ( !replicator.isRunning() ) { + LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting"); + replicator.start(); + } + } + private void doRun() throws Exception { MaxwellAbstractRowsEvent event; while ( this.runState == RunState.RUNNING ) { event = getEvent(); - if ( !replicator.isRunning() ) { - LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting"); - replicator.start(); - } - context.ensurePositionThread(); if ( event == null ) @@ -118,9 +120,6 @@ private void doRun() throws Exception { if ( !skipEvent(event)) { producer.push(event); } - - replicator.setBinlogFileName(event.getBinlogFilename()); - replicator.setBinlogPosition(event.getHeader().getNextPosition()); } try { @@ -199,8 +198,10 @@ private LinkedList getTransactionEvents() throws Excep while ( true ) { v4Event = pollV4EventFromQueue(); - if (v4Event == null) + if (v4Event == null) { + ensureReplicatorThread(); continue; + } switch(v4Event.getHeader().getEventType()) { case MySQLConstants.WRITE_ROWS_EVENT: @@ -214,6 +215,9 @@ private LinkedList getTransactionEvents() throws Excep if ( event.matchesFilter() ) list.add(event); + + setReplicatorPosition(event); + break; case MySQLConstants.TABLE_MAP_EVENT: tableCache.processEvent(this.schema, (TableMapEvent) v4Event); @@ -262,7 +266,10 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception { v4Event = pollV4EventFromQueue(); - if (v4Event == null) return null; + if (v4Event == null) { + ensureReplicatorThread(); + return null; + } switch (v4Event.getHeader().getEventType()) { case MySQLConstants.WRITE_ROWS_EVENT: @@ -286,6 +293,8 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception { default: break; } + + setReplicatorPosition((AbstractBinlogEventV4) v4Event); } } @@ -315,7 +324,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce try ( Connection c = this.context.getConnectionPool().getConnection() ) { new SchemaStore(c, this.context.getServerID(), schema, p).save(); } - this.context.setInitialPositionSync(p); + this.context.setPositionSync(p); } } @@ -347,6 +356,10 @@ public void setFilter(MaxwellFilter filter) { this.filter = filter; } + private void setReplicatorPosition(AbstractBinlogEventV4 e) { + replicator.setBinlogFileName(e.getBinlogFilename()); + replicator.setBinlogPosition(e.getHeader().getNextPosition()); + } } diff --git a/src/main/java/com/zendesk/maxwell/StdoutProducer.java b/src/main/java/com/zendesk/maxwell/StdoutProducer.java index e0fe9b332..58995b47e 100644 --- a/src/main/java/com/zendesk/maxwell/StdoutProducer.java +++ b/src/main/java/com/zendesk/maxwell/StdoutProducer.java @@ -12,6 +12,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception { for ( String json : e.toJSONStrings() ) { System.out.println(json); } - this.context.setInitialPosition(e.getNextBinlogPosition()); + this.context.setPosition(e); } } diff --git a/src/main/java/com/zendesk/maxwell/producer/FileProducer.java b/src/main/java/com/zendesk/maxwell/producer/FileProducer.java index 8895ba05f..6b5bc7002 100644 --- a/src/main/java/com/zendesk/maxwell/producer/FileProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/FileProducer.java @@ -24,6 +24,7 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception { this.fileWriter.write('\n'); this.fileWriter.flush(); } - context.setInitialPosition(e.getNextBinlogPosition()); + + context.setPosition(e); } } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 72534dcca..26154d253 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -45,7 +45,7 @@ public void onCompletion(RecordMetadata md, Exception e) { LOGGER.debug(""); } if ( this.lastRowInEvent ) { - context.setInitialPosition(event.getNextBinlogPosition()); + context.setPosition(event); } } catch (SQLException e1) { e1.printStackTrace(); @@ -60,13 +60,16 @@ public class MaxwellKafkaProducer extends AbstractProducer { "metadata.fetch.timeout.ms", 5000 }; private final KafkaProducer kafka; - private final String topic; + private String topic; private final int numPartitions; public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { super(context); - topic = (kafkaTopic == null) ? "maxwell": kafkaTopic; + this.topic = kafkaTopic; + if ( this.topic == null ) { + this.topic = "maxwell"; + } this.setDefaults(kafkaProperties); this.kafka = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());