Skip to content

Commit

Permalink
Merge pull request #184 from kristiankaufmann/emit_diff
Browse files Browse the repository at this point in the history
Emit diffs for updateRowsEvent
  • Loading branch information
Ben Osheroff committed Dec 10, 2015
2 parents c0598a9 + 6bd47e7 commit 62578e3
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 35 deletions.
32 changes: 19 additions & 13 deletions src/main/java/com/zendesk/maxwell/MaxwellAbstractRowsEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

public abstract class MaxwellAbstractRowsEvent extends AbstractRowEvent {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellAbstractRowsEvent.class);
private final MaxwellFilter filter;
private final AbstractRowEvent event;

protected final Table table;
protected final Database database;
protected final MaxwellFilter filter;

public MaxwellAbstractRowsEvent(AbstractRowEvent e, Table table, MaxwellFilter f) {
this.tableId = e.getTableId();
Expand Down Expand Up @@ -166,19 +166,29 @@ public String toSQL() {
return sql.toString();
}

protected RowMap buildRowMap() {
return new RowMap(
getType(),
getDatabase().getName(),
getTable().getName(),
getHeader().getTimestamp() / 1000,
table.getPKList(),
this.getNextBinlogPosition());
}

protected Object valueForJson(Column c) {
if (c instanceof DatetimeColumn)
return ((DatetimeColumn) c).getLongValue();
return c.getValue();
}

public List<RowMap> jsonMaps() {
ArrayList<RowMap> list = new ArrayList<>();
Object value;
for ( Iterator<Row> ri = filteredRows().iterator() ; ri.hasNext(); ) {
Row r = ri.next();

RowMap rowMap = new RowMap(
getType(),
getDatabase().getName(),
getTable().getName(),
getHeader().getTimestamp() / 1000,
table.getPKList(),
this.getNextBinlogPosition());
RowMap rowMap = buildRowMap();

Iterator<Column> colIter = r.getColumns().iterator();
Iterator<ColumnDef> defIter = table.getColumnList().iterator();
Expand All @@ -187,11 +197,7 @@ public List<RowMap> jsonMaps() {
Column c = colIter.next();
ColumnDef d = defIter.next();

if (c instanceof DatetimeColumn) {
value = ((DatetimeColumn) c).getLongValue();
} else {
value = c.getValue();
}
value = valueForJson(c);

if ( value != null )
value = d.asJSON(value);
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellUpdateRowsEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import java.util.ArrayList;
import java.util.List;
import java.util.*;

import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.glossary.column.DatetimeColumn;
import com.zendesk.maxwell.schema.Table;
import com.zendesk.maxwell.schema.columndef.ColumnDef;

public class MaxwellUpdateRowsEvent extends MaxwellAbstractRowsEvent {
private final UpdateRowsEvent event;
Expand Down Expand Up @@ -48,4 +52,64 @@ public String sqlOperationString() {
public String getType() {
return "update";
}

private LinkedList<Pair<Row>> filteredRowsBeforeAndAfter;
private boolean performedBeforeAndAfterFilter;

private List<Pair<Row>> filteredRowsBeforeAndAfter() {
if ( this.filter == null)
return event.getRows();

if ( performedBeforeAndAfterFilter )
return filteredRowsBeforeAndAfter;

filteredRowsBeforeAndAfter = new LinkedList<>();
for ( Pair<Row> p : event.getRows()) {
if ( this.filter.matchesRow(this, p.getAfter()) )
filteredRowsBeforeAndAfter.add(p);
}
performedBeforeAndAfterFilter = true;
return filteredRowsBeforeAndAfter;
}

@Override
public List<RowMap> jsonMaps() {
ArrayList<RowMap> list = new ArrayList<>();
Object afterValue;
Object beforeValue;
for (Pair<Row> p : filteredRowsBeforeAndAfter() ) {
Row after = p.getAfter();
Row before = p.getBefore();

RowMap rowMap = buildRowMap();

Iterator<Column> aftIter = after.getColumns().iterator();
Iterator<Column> befIter = before.getColumns().iterator();
Iterator<ColumnDef> defIter = table.getColumnList().iterator();
while ( aftIter.hasNext() && defIter.hasNext() && befIter.hasNext() ) {
Column afterColumn = aftIter.next();
ColumnDef columnDef = defIter.next();
Column beforeColumn = befIter.next();

afterValue = valueForJson(afterColumn);
if ( afterValue != null ) {
afterValue = columnDef.asJSON(afterValue);
}

beforeValue = valueForJson(beforeColumn);
if ( beforeValue != null) {
beforeValue = columnDef.asJSON(beforeValue);
}

if ( !Objects.equals(afterValue,beforeValue) ) {//afterValue is different from beforeValue so log beforeValue
rowMap.putOldData(columnDef.getName(), beforeValue);
}

rowMap.putData(columnDef.getName(), afterValue);
}
list.add(rowMap);
}

return list;
}
}
61 changes: 41 additions & 20 deletions src/main/java/com/zendesk/maxwell/RowMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RowMap implements Serializable {
private boolean txCommit;

private final HashMap<String, Object> data;
private final HashMap<String, Object> oldData;
private final List<String> pkColumns;

private static final JsonFactory jsonFactory = new JsonFactory();
Expand Down Expand Up @@ -60,6 +61,7 @@ public RowMap(String type, String database, String table, Long timestamp, List<S
this.table = table;
this.timestamp = timestamp;
this.data = new HashMap<>();
this.oldData = new HashMap<>();
this.nextPosition = nextPosition;
this.pkColumns = pkColumns;
}
Expand Down Expand Up @@ -89,6 +91,33 @@ public String pkToJson() throws IOException {
return jsonFromStream();
}

private void writeMapToJSON(String jsonMapName, HashMap<String, Object> data, boolean includeNullField) throws IOException {
JsonGenerator generator = jsonGeneratorThreadLocal.get();
generator.writeObjectFieldStart(jsonMapName); // start of jsonMapName: {

/* TODO: maintain ordering of fields in column order */
for ( String key: data.keySet() ) {
Object value = data.get(key);

if ( value == null && !includeNullField)
continue;

if ( value instanceof List) { // sets come back from .asJSON as lists, and jackson can't deal with lists natively.
List<String> stringList = (List<String>) value;

generator.writeArrayFieldStart(key);
for ( String s : stringList ) {
generator.writeString(s);
}
generator.writeEndArray();
} else {
generator.writeObjectField(key, value);
}
}

generator.writeEndObject(); // end of 'jsonMapName: { }'
return;
}

public String toJSON() throws IOException {
JsonGenerator g = jsonGeneratorThreadLocal.get();
Expand All @@ -107,28 +136,12 @@ public String toJSON() throws IOException {
if ( this.txCommit )
g.writeBooleanField("commit", true);

g.writeObjectFieldStart("data"); // start of data: {

/* TODO: maintain ordering of fields in column order */
for ( String key: this.data.keySet() ) {
Object data = this.data.get(key);

if ( data == null )
continue;

if ( data instanceof List) { // sets come back from .asJSON as lists, and jackson can't deal with lists natively.
List<String> stringList = (List<String>) data;
writeMapToJSON("data", this.data, false);

g.writeArrayFieldStart(key);
for ( String s : stringList ) {
g.writeString(s);
}
g.writeEndArray();
} else {
g.writeObjectField(key, data);
}
if ( !this.oldData.isEmpty()) {
writeMapToJSON("old", this.oldData, true);
}
g.writeEndObject(); // end of 'data: { }'

g.writeEndObject(); // end of row
g.flush();

Expand All @@ -150,6 +163,14 @@ public void putData(String key, Object value) {
this.data.put(key, value);
}

public Object getOldData(String key) {
return this.oldData.get(key);
}

public void putOldData(String key, Object value) {
this.oldData.put(key, value);
}

public BinlogPosition getPosition() {
return nextPosition;
}
Expand Down
20 changes: 18 additions & 2 deletions src/test/resources/sql/json/test_1j
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,29 @@ insert into test_table set textcol='test_col_2', datecol='1979-10-01 00:00:00'

update test_table set textcol='test_col_5' where id = 1

-> {"database": "test_db_1", "table": "test_table", "type": "update", "data": {"id": 1, "textcol": "test_col_5"} }
-> {"database": "test_db_1", "table": "test_table", "type": "update", "data": {"id": 1, "textcol": "test_col_5"}, "old": {"textcol": "test_col_1"} }

update test_table set datecol='1979-10-01 00:00:00' where id = 1

-> {"database": "test_db_1", "table": "test_table", "type": "update", "data": {"id": 1, "textcol": "test_col_5", "datecol": "1979-10-01 00:00:00" }, "old": {"datecol": null} }

alter table test_table add column `dummycol` mediumint(10) default 10 AFTER id

update test_table set dummycol=5 where id = 1

-> {"database": "test_db_1", "table": "test_table", "type": "update", "data": {"id": 1, "textcol": "test_col_5", "datecol": "1979-10-01 00:00:00", "dummycol": 5}, "old": {"dummycol": 10} }

alter table test_table drop column `dummycol`

update test_table set datecol='1970-01-01 00:00:00' where id = 1

-> {"database": "test_db_1", "table": "test_table", "type": "update", "data": {"id": 1, "textcol": "test_col_5", "datecol": "1970-01-01 00:00:00" }, "old": {"datecol": "1979-10-01 00:00:00"} }

delete from test_table;

/*!40000 ALTER TABLE `test_db_1`.`test_table` ENABLE KEYS */

-> {"database": "test_db_1", "table": "test_table", "type": "delete", "data": {"id": 1, "textcol": "test_col_5"} }
-> {"database": "test_db_1", "table": "test_table", "type": "delete", "data": {"id": 1, "textcol": "test_col_5", "datecol":"1970-01-01 00:00:00"} }
-> {"database": "test_db_1", "table": "test_table", "type": "delete", "data": {"id": 2, "textcol": "test_col_2"} }
-> {"database": "test_db_1", "table": "test_table", "type": "delete", "data": {"id": 3, "textcol": "test_col_2", "datecol": "1979-10-01 00:00:00"} }

Expand Down

0 comments on commit 62578e3

Please sign in to comment.