From 569ce2448a0ab5445a5c745fc29a364a3851b705 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Tue, 29 Sep 2015 13:21:08 -0700 Subject: [PATCH] bunch of fixes around replication positioning. 1. rename setInitialPosition() -> setPosition 2. only store our position into mysql if the event has the "commit" flag. This will prevent us from ending up stopped in the middle of a transaction, which in some cases caused us to miss a table-map event, and thusly miss rows. 3. make sure the replicator thread is running, even inside getTransactionEvents() -- the replicator thread was getting disconnected from out from under us, leading to a process that would never make progress. 4. always setup the replicator's position after processing an event. ensure that if we need to restart the thread it'll always be in the right place. I think I really need to refactor the MaxwellParser class. It's huge. --- .../java/com/zendesk/maxwell/Maxwell.java | 2 +- .../maxwell/MaxwellAbstractRowsEvent.java | 4 +++ .../com/zendesk/maxwell/MaxwellContext.java | 9 +++-- .../com/zendesk/maxwell/MaxwellParser.java | 35 +++++++++++++------ .../com/zendesk/maxwell/StdoutProducer.java | 2 +- .../maxwell/producer/FileProducer.java | 3 +- .../producer/MaxwellKafkaProducer.java | 9 +++-- 7 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index f363c06a4..191666ded 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -29,7 +29,7 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio SchemaStore store = new SchemaStore(connection, this.context.getServerID(), this.schema, pos); store.save(); - this.context.setInitialPosition(pos); + this.context.setPosition(pos); } private void run(String[] argv) throws Exception { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java b/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java index 8bb91a270..593fa94c6 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java @@ -102,6 +102,10 @@ public void setTXCommit() { this.txCommit = true; } + public boolean isTXCommit() { + return txCommit; + } + @Override public String toString() { diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index f2e0cb9fb..75c4a0f56 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -56,11 +56,16 @@ public BinlogPosition getInitialPosition() throws FileNotFoundException, IOExcep return this.initialPosition; } - public void setInitialPosition(BinlogPosition position) throws SQLException { + public void setPosition(MaxwellAbstractRowsEvent e) throws SQLException { + if ( e.isTXCommit() ) { + this.setPosition(e.getNextBinlogPosition()); + } + } + public void setPosition(BinlogPosition position) throws SQLException { this.getSchemaPosition().set(position); } - public void setInitialPositionSync(BinlogPosition position) throws SQLException { + public void setPositionSync(BinlogPosition position) throws SQLException { this.getSchemaPosition().setSync(position); } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellParser.java b/src/main/java/com/zendesk/maxwell/MaxwellParser.java index 2a26a2c6c..f16deba43 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellParser.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellParser.java @@ -99,17 +99,19 @@ public void stop(long timeoutMS) throws TimeoutException { throw new TimeoutException("Maxwell's main parser thread didn't die after " + timeoutMS + "ms."); } + private void ensureReplicatorThread() throws Exception { + if ( !replicator.isRunning() ) { + LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting"); + replicator.start(); + } + } + private void doRun() throws Exception { MaxwellAbstractRowsEvent event; while ( this.runState == RunState.RUNNING ) { event = getEvent(); - if ( !replicator.isRunning() ) { - LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting"); - replicator.start(); - } - context.ensurePositionThread(); if ( event == null ) @@ -118,9 +120,6 @@ private void doRun() throws Exception { if ( !skipEvent(event)) { producer.push(event); } - - replicator.setBinlogFileName(event.getBinlogFilename()); - replicator.setBinlogPosition(event.getHeader().getNextPosition()); } try { @@ -199,8 +198,10 @@ private LinkedList getTransactionEvents() throws Excep while ( true ) { v4Event = pollV4EventFromQueue(); - if (v4Event == null) + if (v4Event == null) { + ensureReplicatorThread(); continue; + } switch(v4Event.getHeader().getEventType()) { case MySQLConstants.WRITE_ROWS_EVENT: @@ -214,6 +215,9 @@ private LinkedList getTransactionEvents() throws Excep if ( event.matchesFilter() ) list.add(event); + + setReplicatorPosition(event); + break; case MySQLConstants.TABLE_MAP_EVENT: tableCache.processEvent(this.schema, (TableMapEvent) v4Event); @@ -262,7 +266,10 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception { v4Event = pollV4EventFromQueue(); - if (v4Event == null) return null; + if (v4Event == null) { + ensureReplicatorThread(); + return null; + } switch (v4Event.getHeader().getEventType()) { case MySQLConstants.WRITE_ROWS_EVENT: @@ -286,6 +293,8 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception { default: break; } + + setReplicatorPosition((AbstractBinlogEventV4) v4Event); } } @@ -315,7 +324,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce try ( Connection c = this.context.getConnectionPool().getConnection() ) { new SchemaStore(c, this.context.getServerID(), schema, p).save(); } - this.context.setInitialPositionSync(p); + this.context.setPositionSync(p); } } @@ -347,6 +356,10 @@ public void setFilter(MaxwellFilter filter) { this.filter = filter; } + private void setReplicatorPosition(AbstractBinlogEventV4 e) { + replicator.setBinlogFileName(e.getBinlogFilename()); + replicator.setBinlogPosition(e.getHeader().getNextPosition()); + } } diff --git a/src/main/java/com/zendesk/maxwell/StdoutProducer.java b/src/main/java/com/zendesk/maxwell/StdoutProducer.java index e0fe9b332..58995b47e 100644 --- a/src/main/java/com/zendesk/maxwell/StdoutProducer.java +++ b/src/main/java/com/zendesk/maxwell/StdoutProducer.java @@ -12,6 +12,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception { for ( String json : e.toJSONStrings() ) { System.out.println(json); } - this.context.setInitialPosition(e.getNextBinlogPosition()); + this.context.setPosition(e); } } diff --git a/src/main/java/com/zendesk/maxwell/producer/FileProducer.java b/src/main/java/com/zendesk/maxwell/producer/FileProducer.java index 8895ba05f..6b5bc7002 100644 --- a/src/main/java/com/zendesk/maxwell/producer/FileProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/FileProducer.java @@ -24,6 +24,7 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception { this.fileWriter.write('\n'); this.fileWriter.flush(); } - context.setInitialPosition(e.getNextBinlogPosition()); + + context.setPosition(e); } } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 72534dcca..26154d253 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -45,7 +45,7 @@ public void onCompletion(RecordMetadata md, Exception e) { LOGGER.debug(""); } if ( this.lastRowInEvent ) { - context.setInitialPosition(event.getNextBinlogPosition()); + context.setPosition(event); } } catch (SQLException e1) { e1.printStackTrace(); @@ -60,13 +60,16 @@ public class MaxwellKafkaProducer extends AbstractProducer { "metadata.fetch.timeout.ms", 5000 }; private final KafkaProducer kafka; - private final String topic; + private String topic; private final int numPartitions; public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) { super(context); - topic = (kafkaTopic == null) ? "maxwell": kafkaTopic; + this.topic = kafkaTopic; + if ( this.topic == null ) { + this.topic = "maxwell"; + } this.setDefaults(kafkaProperties); this.kafka = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());