Skip to content

Commit

Permalink
Merge pull request #115 from zendesk/ben/move_replicator_death_thread…
Browse files Browse the repository at this point in the history
…_check

bunch of fixes around replication positioning.
  • Loading branch information
Ben Osheroff committed Sep 30, 2015
2 parents a5a40f7 + 569ce24 commit d35e971
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio
SchemaStore store = new SchemaStore(connection, this.context.getServerID(), this.schema, pos);
store.save();

this.context.setInitialPosition(pos);
this.context.setPosition(pos);
}

private void run(String[] argv) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void setTXCommit() {
this.txCommit = true;
}

public boolean isTXCommit() {
return txCommit;
}


@Override
public String toString() {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ public BinlogPosition getInitialPosition() throws FileNotFoundException, IOExcep
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws SQLException {
public void setPosition(MaxwellAbstractRowsEvent e) throws SQLException {
if ( e.isTXCommit() ) {
this.setPosition(e.getNextBinlogPosition());
}
}
public void setPosition(BinlogPosition position) throws SQLException {
this.getSchemaPosition().set(position);
}

public void setInitialPositionSync(BinlogPosition position) throws SQLException {
public void setPositionSync(BinlogPosition position) throws SQLException {
this.getSchemaPosition().setSync(position);
}

Expand Down
35 changes: 24 additions & 11 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ public void stop(long timeoutMS) throws TimeoutException {
throw new TimeoutException("Maxwell's main parser thread didn't die after " + timeoutMS + "ms.");
}

private void ensureReplicatorThread() throws Exception {
if ( !replicator.isRunning() ) {
LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting");
replicator.start();
}
}

private void doRun() throws Exception {
MaxwellAbstractRowsEvent event;

while ( this.runState == RunState.RUNNING ) {
event = getEvent();

if ( !replicator.isRunning() ) {
LOGGER.warn("open-replicator stopped at position " + replicator.getBinlogFileName() + ":" + replicator.getBinlogPosition() + " -- restarting");
replicator.start();
}

context.ensurePositionThread();

if ( event == null )
Expand All @@ -118,9 +120,6 @@ private void doRun() throws Exception {
if ( !skipEvent(event)) {
producer.push(event);
}

replicator.setBinlogFileName(event.getBinlogFilename());
replicator.setBinlogPosition(event.getHeader().getNextPosition());
}

try {
Expand Down Expand Up @@ -199,8 +198,10 @@ private LinkedList<MaxwellAbstractRowsEvent> getTransactionEvents() throws Excep

while ( true ) {
v4Event = pollV4EventFromQueue();
if (v4Event == null)
if (v4Event == null) {
ensureReplicatorThread();
continue;
}

switch(v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
Expand All @@ -214,6 +215,9 @@ private LinkedList<MaxwellAbstractRowsEvent> getTransactionEvents() throws Excep

if ( event.matchesFilter() )
list.add(event);

setReplicatorPosition(event);

break;
case MySQLConstants.TABLE_MAP_EVENT:
tableCache.processEvent(this.schema, (TableMapEvent) v4Event);
Expand Down Expand Up @@ -262,7 +266,10 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception {

v4Event = pollV4EventFromQueue();

if (v4Event == null) return null;
if (v4Event == null) {
ensureReplicatorThread();
return null;
}

switch (v4Event.getHeader().getEventType()) {
case MySQLConstants.WRITE_ROWS_EVENT:
Expand All @@ -286,6 +293,8 @@ public MaxwellAbstractRowsEvent getEvent() throws Exception {
default:
break;
}

setReplicatorPosition((AbstractBinlogEventV4) v4Event);
}
}

Expand Down Expand Up @@ -315,7 +324,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce
try ( Connection c = this.context.getConnectionPool().getConnection() ) {
new SchemaStore(c, this.context.getServerID(), schema, p).save();
}
this.context.setInitialPositionSync(p);
this.context.setPositionSync(p);
}
}

Expand Down Expand Up @@ -347,6 +356,10 @@ public void setFilter(MaxwellFilter filter) {
this.filter = filter;
}

private void setReplicatorPosition(AbstractBinlogEventV4 e) {
replicator.setBinlogFileName(e.getBinlogFilename());
replicator.setBinlogPosition(e.getHeader().getNextPosition());
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/StdoutProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
for ( String json : e.toJSONStrings() ) {
System.out.println(json);
}
this.context.setInitialPosition(e.getNextBinlogPosition());
this.context.setPosition(e);
}
}
3 changes: 2 additions & 1 deletion src/main/java/com/zendesk/maxwell/producer/FileProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
this.fileWriter.write('\n');
this.fileWriter.flush();
}
context.setInitialPosition(e.getNextBinlogPosition());

context.setPosition(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void onCompletion(RecordMetadata md, Exception e) {
LOGGER.debug("");
}
if ( this.lastRowInEvent ) {
context.setInitialPosition(event.getNextBinlogPosition());
context.setPosition(event);
}
} catch (SQLException e1) {
e1.printStackTrace();
Expand All @@ -60,13 +60,16 @@ public class MaxwellKafkaProducer extends AbstractProducer {
"metadata.fetch.timeout.ms", 5000
};
private final KafkaProducer<byte[], byte[]> kafka;
private final String topic;
private String topic;
private final int numPartitions;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
super(context);

topic = (kafkaTopic == null) ? "maxwell": kafkaTopic;
this.topic = kafkaTopic;
if ( this.topic == null ) {
this.topic = "maxwell";
}

this.setDefaults(kafkaProperties);
this.kafka = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());
Expand Down

0 comments on commit d35e971

Please sign in to comment.