Skip to content

Commit

Permalink
bunch of fixes around replication positioning.
Browse files Browse the repository at this point in the history
1. rename setInitialPosition() -> setPosition
2. only store our position into mysql if the event has the "commit"
   flag.  This will prevent us from ending up stopped in the middle of a
   transaction, which in some cases caused us to miss a table-map event,
   and thusly miss rows.
3. make sure the replicator thread is running, even inside
   getTransactionEvents() -- the replicator thread was getting
   disconnected from out from under us, leading to a process that would
   never make progress.
4. always setup the replicator's position after processing an event.
   ensure that if we need to restart the thread it'll always be in the
   right place.

I think I really need to refactor the MaxwellParser class.  It's huge.
  • Loading branch information
Ben Osheroff committed Sep 29, 2015
1 parent a5a40f7 commit 569ce24
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio
SchemaStore store = new SchemaStore(connection, this.context.getServerID(), this.schema, pos);
store.save();

this.context.setInitialPosition(pos);
this.context.setPosition(pos);
}

private void run(String[] argv) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void setTXCommit() {
this.txCommit = true;
}

public boolean isTXCommit() {
return txCommit;
}


@Override
public String toString() {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ public BinlogPosition getInitialPosition() throws FileNotFoundException, IOExcep
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws SQLException {
public void setPosition(MaxwellAbstractRowsEvent e) throws SQLException {
if ( e.isTXCommit() ) {
this.setPosition(e.getNextBinlogPosition());
}
}
public void setPosition(BinlogPosition position) throws SQLException {
this.getSchemaPosition().set(position);
}

public void setInitialPositionSync(BinlogPosition position) throws SQLException {
public void setPositionSync(BinlogPosition position) throws SQLException {
this.getSchemaPosition().setSync(position);
}

Expand Down
35 changes: 24 additions & 11 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ public void stop(long timeoutMS) throws TimeoutException {
throw new TimeoutException("Maxwell's main parser thread didn't die after " + timeoutMS + "ms.");
}

private void ensureReplicatorThread() throws Exception {
if ( !replicator.isRunning() ) {
LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting");
replicator.start();
}
}

private void doRun() throws Exception {
MaxwellAbstractRowsEvent event;

while ( this.runState == RunState.RUNNING ) {
event = getEvent();

if ( !replicator.isRunning() ) {
LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting");
replicator.start();
}

context.ensurePositionThread();

if ( event == null )
Expand All @@ -118,9 +120,6 @@ private void doRun() throws Exception {
if ( !skipEvent(event)) {
producer.push(event);
}

replicator.setBinlogFileName(event.getBinlogFilename());
replicator.setBinlogPosition(event.getHeader().getNextPosition());
}

try {
Expand Down Expand Up @@ -199,8 +198,10 @@ private LinkedList<MaxwellAbstractRowsEvent> getTransactionEvents() throws Excep

while ( true ) {
v4Event = pollV4EventFromQueue();
if (v4Event == null)
if (v4Event == null) {
ensureReplicatorThread();
continue;
}

switch(v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
Expand All @@ -214,6 +215,9 @@ private LinkedList<MaxwellAbstractRowsEvent> getTransactionEvents() throws Excep

if ( event.matchesFilter() )
list.add(event);

setReplicatorPosition(event);

break;
case MySQLConstants.TABLE_MAP_EVENT:
tableCache.processEvent(this.schema, (TableMapEvent) v4Event);
Expand Down Expand Up @@ -262,7 +266,10 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception {

v4Event = pollV4EventFromQueue();

if (v4Event == null) return null;
if (v4Event == null) {
ensureReplicatorThread();
return null;
}

switch (v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
Expand All @@ -286,6 +293,8 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception {
default:
break;
}

setReplicatorPosition((AbstractBinlogEventV4) v4Event);
}
}

Expand Down Expand Up @@ -315,7 +324,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce
try ( Connection c = this.context.getConnectionPool().getConnection() ) {
new SchemaStore(c, this.context.getServerID(), schema, p).save();
}
this.context.setInitialPositionSync(p);
this.context.setPositionSync(p);
}
}

Expand Down Expand Up @@ -347,6 +356,10 @@ public void setFilter(MaxwellFilter filter) {
this.filter = filter;
}

private void setReplicatorPosition(AbstractBinlogEventV4 e) {
replicator.setBinlogFileName(e.getBinlogFilename());
replicator.setBinlogPosition(e.getHeader().getNextPosition());
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/StdoutProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
for ( String json : e.toJSONStrings() ) {
System.out.println(json);
}
this.context.setInitialPosition(e.getNextBinlogPosition());
this.context.setPosition(e);
}
}
3 changes: 2 additions & 1 deletion src/main/java/com/zendesk/maxwell/producer/FileProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
this.fileWriter.write('\n');
this.fileWriter.flush();
}
context.setInitialPosition(e.getNextBinlogPosition());

context.setPosition(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void onCompletion(RecordMetadata md, Exception e) {
LOGGER.debug("");
}
if ( this.lastRowInEvent ) {
context.setInitialPosition(event.getNextBinlogPosition());
context.setPosition(event);
}
} catch (SQLException e1) {
e1.printStackTrace();
Expand All @@ -60,13 +60,16 @@ public class MaxwellKafkaProducer extends AbstractProducer {
"metadata.fetch.timeout.ms", 5000
};
private final KafkaProducer<byte[], byte[]> kafka;
private final String topic;
private String topic;
private final int numPartitions;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
super(context);

topic = (kafkaTopic == null) ? "maxwell": kafkaTopic;
this.topic = kafkaTopic;
if ( this.topic == null ) {
this.topic = "maxwell";
}

this.setDefaults(kafkaProperties);
this.kafka = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());
Expand Down

0 comments on commit 569ce24

Please sign in to comment.