From c28246fc66293aee1bf6bf5d33e272f29b668708 Mon Sep 17 00:00:00 2001 From: Gil Cottle Date: Tue, 6 Jul 2021 10:31:41 -0700 Subject: [PATCH 1/3] Intern ColumnDefs to reduce memory by more than half for multi-tenant DB use-cases This is part of a series of PRs o support better multi-tenant DB use-cases that are typically not served well by most CDC systems. This PR does not include any special modes for multi-tenant, it simply reorganizes the memory/code to make it more performant for this use-case. Baseline instance: 70K DBa, 1M Tablea, 6M columns Before/After Change Schema retained size (from heap dump): 350MB/140MB Make ColumnDefs and TableColumnLists immutable and intern them. This is all about optimizing on the fact that we see the same column and table definitions over and over in multi-tenant DBs. There is a negligible hit on modify performance when DDL changes occur to rebuild these immutable objects. This doesn't hurt existing use-case performance where DDL changes occur infrequently and greatly improves memory usage for multi-tenant use-cases. --- .../maxwell/schema/MysqlSavedSchema.java | 70 ++++---- .../com/zendesk/maxwell/schema/Schema.java | 33 +++- .../com/zendesk/maxwell/schema/Table.java | 16 +- .../maxwell/schema/TableColumnList.java | 116 +++++++++--- .../schema/columndef/BigIntColumnDef.java | 32 +++- .../schema/columndef/BitColumnDef.java | 7 +- .../maxwell/schema/columndef/ColumnDef.java | 99 ++++++++--- .../schema/columndef/ColumnDefSerializer.java | 2 +- .../schema/columndef/ColumnDefWithLength.java | 28 ++- .../schema/columndef/DateColumnDef.java | 7 +- .../schema/columndef/DateFormatter.java | 8 +- .../schema/columndef/DateTimeColumnDef.java | 15 +- .../schema/columndef/DecimalColumnDef.java | 7 +- .../schema/columndef/EnumColumnDef.java | 9 +- .../schema/columndef/EnumeratedColumnDef.java | 34 +++- .../schema/columndef/FloatColumnDef.java | 8 +- .../schema/columndef/GeometryColumnDef.java | 7 +- .../schema/columndef/IntColumnDef.java | 35 +++- .../schema/columndef/JsonColumnDef.java | 7 +- .../schema/columndef/SetColumnDef.java | 13 +- .../schema/columndef/StringColumnDef.java | 56 +++++- .../schema/columndef/TimeColumnDef.java | 11 +- .../schema/columndef/YearColumnDef.java | 7 +- .../maxwell/schema/ddl/TableAlter.java | 5 +- .../maxwell/schema/SchemaCaptureTest.java | 4 +- .../schema/columndef/ColumnDefTest.java | 166 +++++++++++++++++- 26 files changed, 635 insertions(+), 167 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java b/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java index 954ec2d6a..3bdd9264a 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java @@ -16,7 +16,6 @@ import com.zendesk.maxwell.util.ConnectionPool; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.text.StrTokenizer; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -635,27 +634,29 @@ public Position getPosition() { private void fixUnsignedColumns(Schema recaptured) throws SQLException, InvalidSchemaError { int unsignedDiffs = 0; - for ( Pair pair : schema.matchColumns(recaptured) ) { - ColumnDef cA = pair.getLeft(); - ColumnDef cB = pair.getRight(); + for ( Pair pair : schema.matchColumns(recaptured) ) { + Table schemaTable = pair.getLeft().getTable(); + ColumnDef schemaCol = pair.getLeft().getColumnDef(); + ColumnDef recapturedCol = pair.getRight().getColumnDef(); - if (cA instanceof IntColumnDef) { - if (cB != null && cB instanceof IntColumnDef) { - if (((IntColumnDef) cA).isSigned() && !((IntColumnDef) cB).isSigned()) { - ((IntColumnDef) cA).setSigned(false); + if (schemaCol instanceof IntColumnDef) { + if (recapturedCol != null && recapturedCol instanceof IntColumnDef) { + if (((IntColumnDef) schemaCol).isSigned() && !((IntColumnDef) recapturedCol).isSigned()) { + schemaTable.replaceColumn(schemaCol.getPos(), ((IntColumnDef) schemaCol).withSigned(false)); unsignedDiffs++; } } else { - LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() + + LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + schemaCol.getName() + ". You may want to recapture your schema"); } - } else if (cA instanceof BigIntColumnDef) { - if (cB != null && cB instanceof BigIntColumnDef) { - if (((BigIntColumnDef) cA).isSigned() && !((BigIntColumnDef) cB).isSigned()) - ((BigIntColumnDef) cA).setSigned(false); + } else if (schemaCol instanceof BigIntColumnDef) { + if (recapturedCol != null && recapturedCol instanceof BigIntColumnDef) { + if (((BigIntColumnDef) schemaCol).isSigned() && !((BigIntColumnDef) recapturedCol).isSigned()) { + schemaTable.replaceColumn(schemaCol.getPos(), ((BigIntColumnDef) schemaCol).withSigned(false)); + } unsignedDiffs++; } else { - LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() + + LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + schemaCol.getName() + ". You may want to recapture your schema"); } } @@ -675,43 +676,42 @@ private void fixUnsignedColumns(Schema recaptured) throws SQLException, InvalidS } } - private void fixColumnCases(Schema recaptured) throws SQLException { + private void fixColumnCases(Schema recaptured) throws InvalidSchemaError { int caseDiffs = 0; - for ( Pair pair : schema.matchColumns(recaptured) ) { - ColumnDef cA = pair.getLeft(); - ColumnDef cB = pair.getRight(); + for (Pair pair : schema.matchColumns(recaptured)) { + Table schemaTable = pair.getLeft().getTable(); + ColumnDef schemaCol = pair.getLeft().getColumnDef(); + ColumnDef recapturedCol = pair.getRight().getColumnDef(); - if ( !cA.getName().equals(cB.getName()) ) { - LOGGER.info("correcting column case of `" + cA.getName() + "` to `" + cB.getName() + "`. Will save a full schema snapshot after the new DDL update is processed."); + if (!schemaCol.getName().equals(recapturedCol.getName())) { + LOGGER.info("correcting column case of `" + schemaCol.getName() + "` to `" + recapturedCol.getName() + "`. Will save a full schema snapshot after the new DDL update is processed."); caseDiffs++; - cA.setName(cB.getName()); + schemaTable.replaceColumn(schemaCol.getPos(), schemaCol.withName(recapturedCol.getName())); } } - - if ( caseDiffs > 0 ) - this.shouldSnapshotNextSchema = true; } - private void fixColumnLength(Schema recaptured) throws SQLException { + private void fixColumnLength(Schema recaptured) throws InvalidSchemaError { int colLengthDiffs = 0; - for ( Pair pair : schema.matchColumns(recaptured) ) { - ColumnDef cA = pair.getLeft(); - ColumnDef cB = pair.getRight(); + for ( Pair pair : schema.matchColumns(recaptured) ) { + Table schemaTable = pair.getLeft().getTable(); + ColumnDef schemaCol = pair.getLeft().getColumnDef(); + ColumnDef recapturedCol = pair.getRight().getColumnDef(); - if (cA instanceof ColumnDefWithLength) { - if (cB != null && cB instanceof ColumnDefWithLength) { - long aColLength = ((ColumnDefWithLength) cA).getColumnLength(); - long bColLength = ((ColumnDefWithLength) cB).getColumnLength(); + if (schemaCol instanceof ColumnDefWithLength) { + if (recapturedCol != null && recapturedCol instanceof ColumnDefWithLength) { + long aColLength = ((ColumnDefWithLength) schemaCol).getColumnLength(); + long bColLength = ((ColumnDefWithLength) recapturedCol).getColumnLength(); if ( aColLength != bColLength ) { colLengthDiffs++; - LOGGER.info("correcting column length of `" + cA.getName() + "` to " + bColLength + ". Will save a full schema snapshot after the new DDL update is processed."); - ((ColumnDefWithLength) cA).setColumnLength(bColLength); + LOGGER.info("correcting column length of `" + schemaCol.getName() + "` to " + bColLength + ". Will save a full schema snapshot after the new DDL update is processed."); + schemaTable.replaceColumn(schemaCol.getPos(), ((ColumnDefWithLength) schemaCol).withColumnLength(bColLength)); } } else { - LOGGER.warn("warning: Couldn't check for column length on column " + cA.getName() + + LOGGER.warn("warning: Couldn't check for column length on column " + schemaCol.getName() + ". You may want to recapture your schema"); } } diff --git a/src/main/java/com/zendesk/maxwell/schema/Schema.java b/src/main/java/com/zendesk/maxwell/schema/Schema.java index 741295277..4c60bf4c7 100644 --- a/src/main/java/com/zendesk/maxwell/schema/Schema.java +++ b/src/main/java/com/zendesk/maxwell/schema/Schema.java @@ -94,8 +94,8 @@ public CaseSensitivity getCaseSensitivity() { return sensitivity; }; - public List> matchColumns(Schema thatSchema) { - ArrayList> list = new ArrayList<>(); + public List> matchColumns(Schema thatSchema) { + ArrayList> list = new ArrayList<>(); for ( Database thisDatabase : this.getDatabases() ) { Database thatDatabase = thatSchema.findDatabase(thisDatabase.getName()); @@ -112,10 +112,37 @@ public List> matchColumns(Schema thatSchema) { for ( ColumnDef thisColumn : thisTable.getColumnList() ) { ColumnDef thatColumn = thatTable.findColumn(thisColumn.getName()); if ( thatColumn != null ) - list.add(Pair.of(thisColumn, thatColumn)); + list.add(Pair.of( + new FullColumnDef(thisDatabase, thisTable, thisColumn), + new FullColumnDef(thatDatabase, thatTable, thatColumn) + )); } } } return list; } + + public static class FullColumnDef { + private final Database db; + private final Table table; + private final ColumnDef columnDef; + + public FullColumnDef(Database db, Table table, ColumnDef columnDef) { + this.db = db; + this.table = table; + this.columnDef = columnDef; + } + + public Database getDb() { + return db; + } + + public Table getTable() { + return table; + } + + public ColumnDef getColumnDef() { + return columnDef; + } + } } diff --git a/src/main/java/com/zendesk/maxwell/schema/Table.java b/src/main/java/com/zendesk/maxwell/schema/Table.java index 227df4fe7..2243ba21e 100644 --- a/src/main/java/com/zendesk/maxwell/schema/Table.java +++ b/src/main/java/com/zendesk/maxwell/schema/Table.java @@ -142,7 +142,7 @@ private void diffColumnList(List diffs, Table a, Table b, String nameA, EnumeratedColumnDef enumA, enumB; enumA = (EnumeratedColumnDef) column; enumB = (EnumeratedColumnDef) other; - if ( !Arrays.deepEquals(enumA.getEnumValues(), enumB.getEnumValues()) ) { + if ( !enumA.getEnumValues().equals(enumB.getEnumValues()) ) { diffs.add(colName + "has an enum value mismatch, " + StringUtils.join(enumA.getEnumValues(), ",") + " vs " @@ -223,8 +223,10 @@ public void diff(List diffs, Table other, String nameA, String nameB) { } public void setDefaultColumnCharsets() { + String newCharset = this.getCharset(); for ( StringColumnDef c : getStringColumns() ) { - c.setDefaultCharset(this.getCharset()); + int index = c.getPos(); + columns.replace(index, c.withDefaultCharset(newCharset)); } } @@ -243,10 +245,12 @@ public void removeColumn(int idx) { } public void renameColumn(int idx, String name) throws InvalidSchemaError { - ColumnDef column = columns.get(idx).clone(); - column.setName(name); - columns.remove(idx); - columns.add(idx, column); + ColumnDef column = columns.get(idx).withName(name); + columns.replace(idx, column); + } + + public void replaceColumn(int idx, ColumnDef definition) throws InvalidSchemaError { + columns.replace(idx, definition); } public void changeColumn(int idx, ColumnPosition position, ColumnDef definition, List deferred) throws InvalidSchemaError { diff --git a/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java b/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java index cadcb5f2d..9d4e2cbce 100644 --- a/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java +++ b/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java @@ -2,36 +2,40 @@ import java.util.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.zendesk.maxwell.schema.columndef.ColumnDef; public class TableColumnList implements Iterable { - private final List columns; - private Set columnNames; + // reduce count of duplicate ArrayLists/Sets for column lists by providing mutability for the class + // through references to an internal immutable object that gets interned. This greatly reduces overhead for + // table definitions that are duplicated across databases + private InternalImmutableTableColumnList internalState; public TableColumnList(List columns) { - this.columns = columns; - renumberColumns(); + this.internalState = InternalImmutableTableColumnList.create(columns); } public Iterator iterator() { - return columns.iterator(); + return internalState.getColumns().iterator(); } public List getList() { - return columns; + return internalState.getColumns(); } public synchronized Set columnNames() { - if ( columnNames == null ) { - columnNames = new HashSet<>(); - for ( ColumnDef cf : columns ) - columnNames.add(cf.getName().toLowerCase().intern()); - } - return columnNames; + return internalState.getColumnNames(); } public synchronized int indexOf(String name) { + return indexOf(internalState.getColumns(), name); + } + + private synchronized int indexOf(List columns, String name) { String lcName = name.toLowerCase(); for ( int i = 0 ; i < columns.size(); i++ ) { @@ -42,7 +46,8 @@ public synchronized int indexOf(String name) { } public ColumnDef findByName(String name) { - int index = indexOf(name); + List columns = internalState.getColumns(); + int index = indexOf(columns, name); if ( index == -1 ) return null; else @@ -50,35 +55,88 @@ public ColumnDef findByName(String name) { } public synchronized void add(int index, ColumnDef definition) { - columns.add(index, definition); - - if ( columnNames != null ) - columnNames.add(definition.getName().toLowerCase()); + List columns = internalState.getColumns(); + ArrayList tempList = new ArrayList<>(columns.size() + 1); + tempList.addAll(columns); + tempList.add(index, definition); + internalState = InternalImmutableTableColumnList.create(tempList); + } - renumberColumns(); + public synchronized void replace(int index, ColumnDef definition) { + List columns = internalState.getColumns(); + ArrayList tempList = new ArrayList<>(columns.size()); + tempList.addAll(columns); + tempList.set(index, definition); + internalState = InternalImmutableTableColumnList.create(tempList); } public synchronized ColumnDef remove(int index) { - ColumnDef c = columns.remove(index); - - if ( columnNames != null ) - columnNames.remove(c.getName().toLowerCase()); - renumberColumns(); + List columns = internalState.getColumns(); + ArrayList tempList = new ArrayList<>(columns.size()); + tempList.addAll(columns); + ColumnDef c = tempList.remove(index); + internalState = InternalImmutableTableColumnList.create(tempList); return c; } public synchronized ColumnDef get(int index) { - return columns.get(index); + return internalState.getColumns().get(index); } public int size() { - return columns.size(); + return internalState.getColumns().size(); } - private void renumberColumns() { - short i = 0 ; - for ( ColumnDef c : columns ) { - c.setPos(i++); + private static final class InternalImmutableTableColumnList { + private static final Interner INTERNER = Interners.newWeakInterner(); + + private final List columns; + private Set columnNames; // not part of equals because it's derived statically + + private InternalImmutableTableColumnList(List columns) { + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(columns.size()); + int i = 0; + for (ColumnDef column : columns) { + builder.add(column.withPos((short) i++)); + } + this.columns = builder.build(); + } + + public static InternalImmutableTableColumnList create(List columns) { + return INTERNER.intern(new InternalImmutableTableColumnList(columns)); + } + + @Override + public boolean equals(Object o) { + if (o instanceof InternalImmutableTableColumnList) { + InternalImmutableTableColumnList other = (InternalImmutableTableColumnList) o; + return columns.equals(other.columns); + } + return false; + } + + @Override + public int hashCode() { + return columns.hashCode(); + } + + public List getColumns() { + return columns; + } + + public Set getColumnNames() { + if ( columnNames == null ) { + columnNames = generateColumnNames(); + } + return columnNames; + } + + private Set generateColumnNames() { + ImmutableSet.Builder setBuilder = ImmutableSet.builderWithExpectedSize(columns.size()); + for ( ColumnDef cf : columns ) { + setBuilder.add(cf.getName().toLowerCase().intern()); + } + return setBuilder.build(); } } } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/BigIntColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/BigIntColumnDef.java index 03f8ce35a..bcbcdf277 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/BigIntColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/BigIntColumnDef.java @@ -3,17 +3,39 @@ import com.zendesk.maxwell.producer.MaxwellOutputConfig; import java.math.BigInteger; +import java.util.Objects; public class BigIntColumnDef extends ColumnDef { static private final BigInteger longlong_max = BigInteger.ONE.shiftLeft(64); - protected boolean signed; + private boolean signed; - public BigIntColumnDef(String name, String type, short pos, boolean signed) { + private BigIntColumnDef(String name, String type, short pos, boolean signed) { super(name, type, pos); this.signed = signed; } + public static BigIntColumnDef create(String name, String type, short pos, boolean signed) { + BigIntColumnDef temp = new BigIntColumnDef(name, type, pos, signed); + return (BigIntColumnDef) INTERNER.intern(temp); + } + + @Override + public boolean equals(Object o) { + if (o.getClass() == getClass()) { + BigIntColumnDef other = (BigIntColumnDef)o; + return super.equals(o) + && signed == other.signed; + } + return false; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + return 31 * hash + Objects.hash(signed); + } + private Object toNumeric(Object value) throws ColumnDefCastException { if ( value instanceof BigInteger ) { return value; @@ -41,7 +63,9 @@ public boolean isSigned() { return signed; } - public void setSigned(boolean signed) { - this.signed = signed; + public BigIntColumnDef withSigned(boolean signed) { + return cloneSelfAndSet(clone -> { + clone.signed = signed; + }); } } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/BitColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/BitColumnDef.java index d9222e7dc..ac1931bbd 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/BitColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/BitColumnDef.java @@ -6,10 +6,15 @@ import java.util.BitSet; public class BitColumnDef extends ColumnDef { - public BitColumnDef(String name, String type, short pos) { + private BitColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static BitColumnDef create(String name, String type, short pos) { + BitColumnDef temp = new BitColumnDef(name, type, pos); + return (BitColumnDef) INTERNER.intern(temp); + } + @Override public Object asJSON(Object value, MaxwellOutputConfig outputConfig) throws ColumnDefCastException { byte[] bytes; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDef.java index d777c8399..ffe91bf5d 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDef.java @@ -1,28 +1,60 @@ package com.zendesk.maxwell.schema.columndef; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.zendesk.maxwell.producer.MaxwellOutputConfig; import com.zendesk.maxwell.util.DynamicEnum; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * This class is immutable, all subclasses must be immutable and implement equals and hashCode and call this + * class's respective methods if the subclass has any member variables. Failure to do so will lead difficult + * to debug errors as these class instances are interned. Subclasses may use {@link #cloneSelfAndSet} to + * follow clone/modify/intern pattern for maintaining interface immutability. + * + */ @JsonSerialize(using=ColumnDefSerializer.class) @JsonDeserialize(using=ColumnDefDeserializer.class) - public abstract class ColumnDef implements Cloneable { - private static DynamicEnum dynamicEnum = new DynamicEnum(Byte.MAX_VALUE); - protected String name; - protected byte type; - protected short pos; + protected static final Interner INTERNER = Interners.newWeakInterner(); + private static final DynamicEnum dynamicEnum = new DynamicEnum(Byte.MAX_VALUE); + private String name; + private final byte type; + private short pos; - public ColumnDef() { } - public ColumnDef(String name, String type, short pos) { + protected ColumnDef(String name, String type, short pos) { this.name = name; this.pos = pos; this.type = (byte) dynamicEnum.get(type); } + @Override + public boolean equals(Object o) { + if (o instanceof ColumnDef && o.getClass() == getClass()) { + ColumnDef other = (ColumnDef) o; + return Objects.equals(name, other.name) + && Objects.equals(pos, other.pos) + && Objects.equals(type, other.type); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(name, type, pos); + } + public abstract String toSQL(Object value) throws ColumnDefCastException; + protected Interner getInterner() { + // maintain default interner + return (Interner) INTERNER; + } + @Deprecated public Object asJSON(Object value) throws ColumnDefCastException { return asJSON(value, new MaxwellOutputConfig()); @@ -50,23 +82,23 @@ public static ColumnDef build(String name, String charset, String type, short po case "smallint": case "mediumint": case "int": - return new IntColumnDef(name, type, pos, signed); + return IntColumnDef.create(name, type, pos, signed); case "bigint": - return new BigIntColumnDef(name, type, pos, signed); + return BigIntColumnDef.create(name, type, pos, signed); case "tinytext": case "text": case "mediumtext": case "longtext": case "varchar": case "char": - return new StringColumnDef(name, type, pos, charset); + return StringColumnDef.create(name, type, pos, charset); case "tinyblob": case "blob": case "mediumblob": case "longblob": case "binary": case "varbinary": - return new StringColumnDef(name, type, pos, "binary"); + return StringColumnDef.create(name, type, pos, "binary"); case "geometry": case "geometrycollection": case "linestring": @@ -75,29 +107,29 @@ public static ColumnDef build(String name, String charset, String type, short po case "multipolygon": case "polygon": case "point": - return new GeometryColumnDef(name, type, pos); + return GeometryColumnDef.create(name, type, pos); case "float": case "double": - return new FloatColumnDef(name, type, pos); + return FloatColumnDef.create(name, type, pos); case "decimal": - return new DecimalColumnDef(name, type, pos); + return DecimalColumnDef.create(name, type, pos); case "date": - return new DateColumnDef(name, type, pos); + return DateColumnDef.create(name, type, pos); case "datetime": case "timestamp": - return new DateTimeColumnDef(name, type, pos, columnLength); + return DateTimeColumnDef.create(name, type, pos, columnLength); case "time": - return new TimeColumnDef(name, type, pos, columnLength); + return TimeColumnDef.create(name, type, pos, columnLength); case "year": - return new YearColumnDef(name, type, pos); + return YearColumnDef.create(name, type, pos); case "enum": - return new EnumColumnDef(name, type, pos, enumValues); + return EnumColumnDef.create(name, type, pos, enumValues); case "set": - return new SetColumnDef(name, type, pos, enumValues); + return SetColumnDef.create(name, type, pos, enumValues); case "bit": - return new BitColumnDef(name, type, pos); + return BitColumnDef.create(name, type, pos); case "json": - return new JsonColumnDef(name, type, pos); + return JsonColumnDef.create(name, type, pos); default: throw new IllegalArgumentException("unsupported column type " + type); @@ -192,8 +224,10 @@ else if ( columnLength < ( 1 << 24) ) } } - public void setName(String name) { - this.name = name; + public ColumnDef withName(String name) { + return cloneSelfAndSet(clone -> { + clone.name = name; + }); } public String getName() { @@ -208,7 +242,18 @@ public int getPos() { return pos; } - public void setPos(short i) { - this.pos = i; + public ColumnDef withPos(short i) { + if (pos == i) { + return this; + } + return cloneSelfAndSet(clone -> { + clone.pos = i; + }); + } + + protected T cloneSelfAndSet(Consumer mutator) { + T clone = (T) clone(); + mutator.accept(clone); + return (T) getInterner().intern(clone); } } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefSerializer.java b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefSerializer.java index 05d2b90f2..9c453e377 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefSerializer.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefSerializer.java @@ -12,7 +12,7 @@ public class ColumnDefSerializer extends JsonSerializer { public void serialize(ColumnDef def, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { jgen.writeStartObject(); jgen.writeStringField("type", def.getType()); - jgen.writeStringField("name", def.name); + jgen.writeStringField("name", def.getName()); if ( def instanceof StringColumnDef ) { jgen.writeStringField("charset", ((StringColumnDef) def).getCharset()); diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefWithLength.java b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefWithLength.java index 58f2be213..6303e12ff 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefWithLength.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/ColumnDefWithLength.java @@ -2,8 +2,10 @@ import com.zendesk.maxwell.producer.MaxwellOutputConfig; +import java.util.Objects; + public abstract class ColumnDefWithLength extends ColumnDef { - protected Long columnLength; + private Long columnLength; protected static ThreadLocal threadLocalBuilder = new ThreadLocal() { @Override @@ -19,7 +21,7 @@ public StringBuilder get() { } }; - public ColumnDefWithLength(String name, String type, short pos, Long columnLength) { + protected ColumnDefWithLength(String name, String type, short pos, Long columnLength) { super(name, type, pos); if ( columnLength == null ) this.columnLength = 0L; @@ -27,6 +29,22 @@ public ColumnDefWithLength(String name, String type, short pos, Long columnLengt this.columnLength = columnLength; } + @Override + public boolean equals(Object o) { + if (o.getClass() == getClass()) { + ColumnDefWithLength other = (ColumnDefWithLength)o; + return super.equals(o) + && Objects.equals(columnLength, other.columnLength); + } + return false; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + return 31 * hash + Objects.hash(columnLength); + } + @Override public String toSQL(Object value) throws ColumnDefCastException { return "'" + formatValue(value, new MaxwellOutputConfig()) + "'"; @@ -39,8 +57,10 @@ public Object asJSON(Object value, MaxwellOutputConfig config) throws ColumnDefC public Long getColumnLength() { return columnLength ; } - public void setColumnLength(long length) { - this.columnLength = length; + public ColumnDefWithLength withColumnLength(long length) { + return cloneSelfAndSet(clone -> { + clone.columnLength = length; + }); } protected abstract String formatValue(Object value, MaxwellOutputConfig config) throws ColumnDefCastException; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/DateColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/DateColumnDef.java index d83b76646..61160d4de 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/DateColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/DateColumnDef.java @@ -3,10 +3,15 @@ import com.zendesk.maxwell.producer.MaxwellOutputConfig; public class DateColumnDef extends ColumnDef { - public DateColumnDef(String name, String type, short pos) { + private DateColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static DateColumnDef create(String name, String type, short pos) { + DateColumnDef temp = new DateColumnDef(name, type, pos); + return (DateColumnDef) INTERNER.intern(temp); + } + @Override public String toSQL(Object value) { String formatted = DateFormatter.formatDate(value); diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/DateFormatter.java b/src/main/java/com/zendesk/maxwell/schema/columndef/DateFormatter.java index a2dac81c2..9d02037d2 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/DateFormatter.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/DateFormatter.java @@ -4,10 +4,10 @@ import java.util.*; public class DateFormatter { - private static TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC"); - private static ThreadLocal calendarThreadLocal = ThreadLocal.withInitial(() -> Calendar.getInstance()); - private static ThreadLocal calendarUTCThreadLocal = ThreadLocal.withInitial(() -> Calendar.getInstance(UTC_ZONE)); - private static ThreadLocal stringBuilderThreadLocal = ThreadLocal.withInitial(() -> new StringBuilder(32)); + private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC"); + private static final ThreadLocal calendarThreadLocal = ThreadLocal.withInitial(() -> Calendar.getInstance()); + private static final ThreadLocal calendarUTCThreadLocal = ThreadLocal.withInitial(() -> Calendar.getInstance(UTC_ZONE)); + private static final ThreadLocal stringBuilderThreadLocal = ThreadLocal.withInitial(() -> new StringBuilder(32)); public static Timestamp extractTimestamp(Object value) throws IllegalArgumentException { if (value instanceof Long) { diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java index b717ff2d7..9b48c99e0 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/DateTimeColumnDef.java @@ -5,11 +5,18 @@ import java.sql.Timestamp; public class DateTimeColumnDef extends ColumnDefWithLength { - public DateTimeColumnDef(String name, String type, short pos, Long columnLength) { + + private final boolean isTimestamp = getType().equals("timestamp"); + + private DateTimeColumnDef(String name, String type, short pos, Long columnLength) { super(name, type, pos, columnLength); } - final private boolean isTimestamp = getType().equals("timestamp"); + public static DateTimeColumnDef create(String name, String type, short pos, Long columnLength) { + DateTimeColumnDef temp = new DateTimeColumnDef(name, type, pos, columnLength); + return (DateTimeColumnDef) INTERNER.intern(temp); + } + protected String formatValue(Object value, MaxwellOutputConfig config) throws ColumnDefCastException { // special case for those broken mysql dates. if ( value instanceof Long ) { @@ -18,14 +25,14 @@ protected String formatValue(Object value, MaxwellOutputConfig config) throws Co if ( config.zeroDatesAsNull ) return null; else - return appendFractionalSeconds("0000-00-00 00:00:00", 0, columnLength); + return appendFractionalSeconds("0000-00-00 00:00:00", 0, getColumnLength()); } } try { Timestamp ts = DateFormatter.extractTimestamp(value); String dateString = DateFormatter.formatDateTime(value, ts); - return appendFractionalSeconds(dateString, ts.getNanos(), columnLength); + return appendFractionalSeconds(dateString, ts.getNanos(), getColumnLength()); } catch ( IllegalArgumentException e ) { throw new ColumnDefCastException(this, value); } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/DecimalColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/DecimalColumnDef.java index 9ed39e257..bad911757 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/DecimalColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/DecimalColumnDef.java @@ -3,10 +3,15 @@ import java.math.BigDecimal; public class DecimalColumnDef extends ColumnDef { - public DecimalColumnDef(String name, String type, short pos) { + private DecimalColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static DecimalColumnDef create(String name, String type, short pos) { + DecimalColumnDef temp = new DecimalColumnDef(name, type, pos); + return (DecimalColumnDef) INTERNER.intern(temp); + } + @Override public String toSQL(Object value) { BigDecimal d = (BigDecimal) value; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/EnumColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/EnumColumnDef.java index faa859134..bbb774b00 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/EnumColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/EnumColumnDef.java @@ -3,10 +3,15 @@ import com.zendesk.maxwell.producer.MaxwellOutputConfig; public class EnumColumnDef extends EnumeratedColumnDef { - public EnumColumnDef(String name, String type, short pos, String[] enumValues) { + private EnumColumnDef(String name, String type, short pos, String[] enumValues) { super(name, type, pos, enumValues); } + public static EnumColumnDef create(String name, String type, short pos, String[] enumValues) { + EnumColumnDef temp = new EnumColumnDef(name, type, pos, enumValues); + return (EnumColumnDef) INTERNER.intern(temp); + } + @Override public String toSQL(Object value) throws ColumnDefCastException { return "'" + asString(value) + "'"; @@ -26,7 +31,7 @@ private String asString(Object value) throws ColumnDefCastException { if (i == 0) return null; else - return enumValues[((Integer) value) - 1]; + return getEnumValues().get(((Integer) value) - 1); } else { throw new ColumnDefCastException(this, value); } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/EnumeratedColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/EnumeratedColumnDef.java index 4f513deb3..cc557fbed 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/EnumeratedColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/EnumeratedColumnDef.java @@ -1,19 +1,41 @@ package com.zendesk.maxwell.schema.columndef; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; abstract public class EnumeratedColumnDef extends ColumnDef { @JsonProperty("enum-values") - protected String[] enumValues; + private final List enumValues; - public EnumeratedColumnDef(String name, String type, short pos, String [] enumValues) { + protected EnumeratedColumnDef(String name, String type, short pos, String [] enumValues) { super(name, type, pos); - this.enumValues = new String[enumValues.length]; - for ( int i = 0; i < enumValues.length; i++) - this.enumValues[i] = enumValues[i].intern(); + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(enumValues.length); + for (String enumValue : enumValues) { + builder.add(enumValue.intern()); + } + this.enumValues = builder.build(); } - public String[] getEnumValues() { + public List getEnumValues() { return enumValues; } + + @Override + public boolean equals(Object o) { + if (o.getClass() == getClass()) { + EnumeratedColumnDef other = (EnumeratedColumnDef)o; + return super.equals(o) + && Objects.equals(enumValues, other.enumValues); + } + return false; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + return 31 * hash + Objects.hash(enumValues); + } } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/FloatColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/FloatColumnDef.java index fd40dd786..3f2a0fafe 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/FloatColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/FloatColumnDef.java @@ -1,12 +1,14 @@ package com.zendesk.maxwell.schema.columndef; public class FloatColumnDef extends ColumnDef { - public FloatColumnDef() { } - public FloatColumnDef(String name, String type, short pos) { + private FloatColumnDef(String name, String type, short pos) { super(name, type, pos); } - public boolean signed; + public static FloatColumnDef create(String name, String type, short pos) { + FloatColumnDef temp = new FloatColumnDef(name, type, pos); + return (FloatColumnDef) INTERNER.intern(temp); + } @Override public String toSQL(Object value) { diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/GeometryColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/GeometryColumnDef.java index 327f9587f..bc60c7c1d 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/GeometryColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/GeometryColumnDef.java @@ -11,10 +11,15 @@ * Created by ben on 12/30/15. */ public class GeometryColumnDef extends ColumnDef { - public GeometryColumnDef(String name, String type, short pos) { + private GeometryColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static GeometryColumnDef create(String name, String type, short pos) { + GeometryColumnDef temp = new GeometryColumnDef(name, type, pos); + return (GeometryColumnDef) INTERNER.intern(temp); + } + @Override public Object asJSON(Object value, MaxwellOutputConfig config) throws ColumnDefCastException { Geometry geometry = null; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java index 3f1301c4c..c0f30aa90 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/IntColumnDef.java @@ -2,17 +2,40 @@ import com.zendesk.maxwell.producer.MaxwellOutputConfig; +import java.util.Objects; + public class IntColumnDef extends ColumnDef { - public int bits; + private final int bits; - protected boolean signed; + private boolean signed; - public IntColumnDef(String name, String type, short pos, boolean signed) { + private IntColumnDef(String name, String type, short pos, boolean signed) { super(name, type, pos); this.signed = signed; this.bits = bitsFromType(type); } + public static IntColumnDef create(String name, String type, short pos, boolean signed) { + IntColumnDef temp = new IntColumnDef(name, type, pos, signed); + return (IntColumnDef) INTERNER.intern(temp); + } + + @Override + public boolean equals(Object o) { + if (o.getClass() == getClass()) { + IntColumnDef other = (IntColumnDef)o; + return super.equals(o) + && bits == other.bits + && signed == other.signed; + } + return false; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + return 31 * hash + Objects.hash(bits, signed); + } private long castUnsigned(Integer i, long max_value) { if ( i < 0 ) @@ -69,7 +92,9 @@ public boolean isSigned() { return signed; } - public void setSigned(boolean signed) { - this.signed = signed; + public IntColumnDef withSigned(boolean signed) { + return cloneSelfAndSet(clone -> { + clone.signed = signed; + }); } } diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java index 54a2cd1cd..0ffabccd5 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java @@ -9,10 +9,15 @@ import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.*; public class JsonColumnDef extends ColumnDef { - public JsonColumnDef(String name, String type, short pos) { + private JsonColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static JsonColumnDef create(String name, String type, short pos) { + JsonColumnDef temp = new JsonColumnDef(name, type, pos); + return (JsonColumnDef) INTERNER.intern(temp); + } + @Override public Object asJSON(Object value, MaxwellOutputConfig config) throws ColumnDefCastException { String jsonString; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/SetColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/SetColumnDef.java index 0fdd2632c..d8bd3d18b 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/SetColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/SetColumnDef.java @@ -2,15 +2,21 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import com.zendesk.maxwell.producer.MaxwellOutputConfig; import org.apache.commons.lang3.StringUtils; public class SetColumnDef extends EnumeratedColumnDef { - public SetColumnDef(String name, String type, short pos, String[] enumValues) { + private SetColumnDef(String name, String type, short pos, String[] enumValues) { super(name, type, pos, enumValues); } + public static SetColumnDef create(String name, String type, short pos, String[] enumValues) { + SetColumnDef temp = new SetColumnDef(name, type, pos, enumValues); + return (SetColumnDef) INTERNER.intern(temp); + } + @Override public String toSQL(Object value) throws ColumnDefCastException { return "'" + StringUtils.join(asList(value), "'") + "'"; @@ -27,9 +33,10 @@ private ArrayList asList(Object value) throws ColumnDefCastException { } else if ( value instanceof Long ) { ArrayList values = new ArrayList<>(); long v = (Long) value; - for (int i = 0; i < enumValues.length; i++) { + List enumValues = getEnumValues(); + for (int i = 0; i < enumValues.size(); i++) { if (((v >> i) & 1) == 1) { - values.add(enumValues[i]); + values.add(enumValues.get(i)); } } return values; diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java index 44ba6f0aa..74d5ad1b9 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java @@ -1,8 +1,12 @@ package com.zendesk.maxwell.schema.columndef; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.charset.UnsupportedCharsetException; +import java.util.Objects; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.zendesk.maxwell.producer.MaxwellOutputConfig; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; @@ -12,24 +16,53 @@ import org.slf4j.LoggerFactory; public class StringColumnDef extends ColumnDef { - public String charset; + // mutability only allowed after clone and prior to insertion to interner + private String charset; - public StringColumnDef(String name, String type, short pos, String charset) { + private StringColumnDef(String name, String type, short pos, String charset) { super(name, type, pos); this.charset = charset; } + public static StringColumnDef create(String name, String type, short pos, String charset) { + StringColumnDef temp = new StringColumnDef(name, type, pos, charset); + return (StringColumnDef) INTERNER.intern(temp); + } + + @Override + public boolean equals(Object o) { + if (o.getClass() == getClass()) { + StringColumnDef other = (StringColumnDef) o; + return super.equals(other) + && Objects.equals(charset, other.charset); + } + return false; + } + + @Override + public int hashCode() { + int hash = super.hashCode(); + return 31 * hash + Objects.hash(charset); + } + public String getCharset() { return charset; } - public void setCharset(String charset) { - this.charset = charset; + public StringColumnDef withCharset(String charset) { + return cloneSelfAndSet(clone -> { + clone.charset = charset; + }); } - public void setDefaultCharset(String e) { - if ( this.charset == null ) - this.charset = e; + public StringColumnDef withDefaultCharset(String charset) { + if ( this.charset == null ) { + return cloneSelfAndSet(clone -> { + clone.charset = charset; + }); + } else { + return this; + } } @Override @@ -43,15 +76,20 @@ public String toSQL(Object value) { } } + @Override + protected Interner getInterner() { + return INTERNER; + } + // this could obviously be more complete. private Charset charsetForCharset() { switch(charset.toLowerCase()) { case "utf8": case "utf8mb4": - return Charset.forName("UTF-8"); + return StandardCharsets.UTF_8; case "latin1": case "ascii": return Charset.forName("Windows-1252"); case "ucs2": - return Charset.forName("UTF-16"); + return StandardCharsets.UTF_16; case "ujis": return Charset.forName("EUC-JP"); default: diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/TimeColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/TimeColumnDef.java index 74f0734ac..d674aff6c 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/TimeColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/TimeColumnDef.java @@ -6,22 +6,27 @@ import java.sql.Timestamp; public class TimeColumnDef extends ColumnDefWithLength { - public TimeColumnDef(String name, String type, short pos, Long columnLength) { + private TimeColumnDef(String name, String type, short pos, Long columnLength) { super(name, type, pos, columnLength); } + public static TimeColumnDef create(String name, String type, short pos, Long columnLength) { + TimeColumnDef temp = new TimeColumnDef(name, type, pos, columnLength); + return (TimeColumnDef) INTERNER.intern(temp); + } + protected String formatValue(Object value, MaxwellOutputConfig config) throws ColumnDefCastException { if ( value instanceof Timestamp ) { Time time = new Time(((Timestamp) value).getTime()); String timeAsStr = String.valueOf(time); - return appendFractionalSeconds(timeAsStr, ((Timestamp) value).getNanos(), this.columnLength); + return appendFractionalSeconds(timeAsStr, ((Timestamp) value).getNanos(), this.getColumnLength()); } else if ( value instanceof Long ) { Time time = new Time((Long) value / 1000); String timeAsStr = String.valueOf(time); - return appendFractionalSeconds(timeAsStr, (int) ((Long) value % 1000000) * 1000, this.columnLength); + return appendFractionalSeconds(timeAsStr, (int) ((Long) value % 1000000) * 1000, this.getColumnLength()); } else if ( value instanceof Time ){ return String.valueOf((Time) value); } else { diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/YearColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/YearColumnDef.java index 473243194..562e8ed6e 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/YearColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/YearColumnDef.java @@ -6,10 +6,15 @@ import java.util.Calendar; public class YearColumnDef extends ColumnDef { - public YearColumnDef(String name, String type, short pos) { + private YearColumnDef(String name, String type, short pos) { super(name, type, pos); } + public static YearColumnDef create(String name, String type, short pos) { + YearColumnDef temp = new YearColumnDef(name, type, pos); + return (YearColumnDef) INTERNER.intern(temp); + } + @Override public Object asJSON(Object value, MaxwellOutputConfig outputConfig) { if ( value instanceof Date ) { diff --git a/src/main/java/com/zendesk/maxwell/schema/ddl/TableAlter.java b/src/main/java/com/zendesk/maxwell/schema/ddl/TableAlter.java index 50d8651a8..9c49e01d6 100644 --- a/src/main/java/com/zendesk/maxwell/schema/ddl/TableAlter.java +++ b/src/main/java/com/zendesk/maxwell/schema/ddl/TableAlter.java @@ -59,8 +59,9 @@ public ResolvedTableAlter resolve(Schema schema) throws InvalidSchemaError { if ( convertCharset != null ) { for ( StringColumnDef sc : table.getStringColumns() ) { - if (sc.getCharset() == null || !sc.getCharset().toLowerCase().equals("binary") ) - sc.setCharset(convertCharset); + if (sc.getCharset() == null || !sc.getCharset().toLowerCase().equals("binary") ) { + table.replaceColumn(sc.getPos(), sc.withCharset(convertCharset)); + } } } diff --git a/src/test/java/com/zendesk/maxwell/schema/SchemaCaptureTest.java b/src/test/java/com/zendesk/maxwell/schema/SchemaCaptureTest.java index 2f534ece0..8566c1ad5 100644 --- a/src/test/java/com/zendesk/maxwell/schema/SchemaCaptureTest.java +++ b/src/test/java/com/zendesk/maxwell/schema/SchemaCaptureTest.java @@ -140,12 +140,12 @@ public void testEnums() throws SQLException, InvalidSchemaError, IOException { assertThat(columns[0], notNullValue()); assertThat(columns[0], instanceOf(EnumColumnDef.class)); assertThat(columns[0].getName(), is("language")); - assertArrayEquals(((EnumColumnDef) columns[0]).getEnumValues(), new String[] {"en-US", "de-DE"}); + assertEquals(((EnumColumnDef) columns[0]).getEnumValues(), List.of("en-US", "de-DE")); assertThat(columns[1], notNullValue()); assertThat(columns[1], instanceOf(EnumColumnDef.class)); assertThat(columns[1].getName(), is("decimal_separator")); - assertArrayEquals(((EnumColumnDef) columns[1]).getEnumValues(), new String[] {",", "."}); + assertEquals(((EnumColumnDef) columns[1]).getEnumValues(), List.of(",", ".")); } @Test diff --git a/src/test/java/com/zendesk/maxwell/schema/columndef/ColumnDefTest.java b/src/test/java/com/zendesk/maxwell/schema/columndef/ColumnDefTest.java index 6c04f4471..1fd813227 100644 --- a/src/test/java/com/zendesk/maxwell/schema/columndef/ColumnDefTest.java +++ b/src/test/java/com/zendesk/maxwell/schema/columndef/ColumnDefTest.java @@ -1,24 +1,53 @@ package com.zendesk.maxwell.schema.columndef; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.*; +import com.google.common.collect.ImmutableList; +import com.zendesk.maxwell.TestWithNameLogging; +import com.zendesk.maxwell.producer.MaxwellOutputConfig; +import com.zendesk.maxwell.row.RawJSONString; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.GregorianCalendar; +import java.util.List; import java.util.TimeZone; -import com.zendesk.maxwell.TestWithNameLogging; -import com.zendesk.maxwell.producer.MaxwellOutputConfig; -import com.zendesk.maxwell.row.RawJSONString; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class ColumnDefTest extends TestWithNameLogging { + private final List> ALL_CLASSES = ImmutableList.>builder() + .add(BigIntColumnDef.class) + .add(BitColumnDef.class) + .add(ColumnDef.class) + .add(ColumnDefWithLength.class) + .add(DateColumnDef.class) + .add(DateTimeColumnDef.class) + .add(DecimalColumnDef.class) + .add(EnumColumnDef.class) + .add(EnumeratedColumnDef.class) + .add(FloatColumnDef.class) + .add(GeometryColumnDef.class) + .add(IntColumnDef.class) + .add(JsonColumnDef.class) + .add(SetColumnDef.class) + .add(StringColumnDef.class) + .add(TimeColumnDef.class) + .add(YearColumnDef.class) + .build(); + private ColumnDef build(String type, boolean signed) { return ColumnDef.build("bar", "", type, (short) 1, signed, null, null); } @@ -365,4 +394,123 @@ public void TestBit() throws ColumnDefCastException { assertThat(d.toSQL(bO), is("0")); } + @Test + public void testEquality() throws ColumnDefCastException { + ColumnDef a = ColumnDef.build("bar", "utf8", "varchar", (short) 1, false, null, null); + ColumnDef b = ColumnDef.build("bar", "utf8", "varchar", (short) 1, false, null, null); + Assert.assertTrue(a == b); // verify auto-interning works + } + + @Test + public void testWithPos() throws ColumnDefCastException { + ColumnDef a = ColumnDef.build("bar", "utf8", "varchar", (short) 1, false, null, null); + ColumnDef b = ColumnDef.build("bar", "utf8", "varchar", (short) 2, false, null, null); + ColumnDef aToB = a.withPos((short) 2); + ColumnDef bToA = b.withPos((short) 1); + Assert.assertTrue(a.getPos() == 1); + Assert.assertTrue(b.getPos() == 2); + Assert.assertTrue(a == bToA); // verify auto-interning works + Assert.assertTrue(b == aToB); // verify auto-interning works + } + + @Test + public void testWithName() throws ColumnDefCastException { + ColumnDef a = ColumnDef.build("foo", "utf8", "varchar", (short) 1, false, null, null); + ColumnDef b = ColumnDef.build("bar", "utf8", "varchar", (short) 1, false, null, null); + ColumnDef aToB = a.withName("bar"); + ColumnDef bToA = b.withName("foo"); + Assert.assertEquals("foo", a.getName()); + Assert.assertEquals("bar", b.getName()); + Assert.assertTrue(a == bToA); // verify auto-interning works + Assert.assertTrue(b == aToB); // verify auto-interning works + } + + /** + * Series of checks that attempt to detect a modification that will introduce mutability into ColumnDefs. This isn't + * perfect, it won't detect exposing a mutable List, but will catch common errors in the rare cases someone adds a + * type + */ + @Test + public void testInterfaceImmutability() { + // not sure how to use testng equivalent of dataprovider with junit + // all ColumnDef classes should be listed + for (Class clazz : ALL_CLASSES) { + List> classesToTest = new ArrayList<>(); + classesToTest.add(clazz); + Class nextClazz = clazz.getSuperclass(); + while (nextClazz != null && nextClazz != Object.class) { + classesToTest.add(nextClazz); + nextClazz = nextClazz.getSuperclass(); + } + for (Class cut : classesToTest) { + Field[] declaredFields = cut.getDeclaredFields(); + String className = cut.getName(); + + // check for setters + boolean foundEquals = false; + boolean foundHashCode = false; + for (Method m : cut.getDeclaredMethods()) { + if (m.getName().startsWith("set")) { + Assert.fail(className + ": All methods extending ColumnDef must be immutable so hashCode/equals work. Use withXXX and clone object instead of setting values"); + } else if (m.getName().equals("equals") && m.getParameterCount() == 1 && m.getParameterTypes()[0] == Object.class) { + foundEquals = true; + } else if (m.getName().equals("hashCode") && m.getParameterCount() == 0) { + foundHashCode = true; + } + } + + for (Field field : declaredFields) { + final int modifiers = field.getModifiers(); + final boolean isStatic = Modifier.isStatic(modifiers); + if (isStatic) { + // assume static is fine + continue; + } + + final boolean isFinal = Modifier.isFinal(modifiers); + final boolean isPrivate = Modifier.isPrivate(modifiers); + System.out.println("Checking " + className + "." + field.getName() + " " + isPrivate); + + // check field immutability + if (!isFinal && !isPrivate) { + Assert.fail(className + ": Non-private field " + field.getName() + " is mutable. All classes must have immutable interfaces (public, protected, package-private)"); + } + if (!isPrivate) { + Assert.fail(className + ": Should not have direct access to member variable " + field.getName()); + } + + if (!(foundEquals && foundHashCode)) { + if (cut == DateTimeColumnDef.class && field.getName().equals("isTimestamp")) { + // this field is derived from type, there's no need for equals/hashCode with this variable + } else { + Assert.fail(className + " has member variables, but not equals/HashCode implementations"); + } + } + } + } + } + } + + + @Test + public void testConstructorDefinitions() { + for (Class clazz : ALL_CLASSES) { + if (Modifier.isAbstract(clazz.getModifiers())) { + continue; + } + for (Constructor c : clazz.getDeclaredConstructors()) { + Assert.assertTrue(clazz.getName() + ": ColumnDef concrete constructors should all be private and have " + + "a create method to intern new creations", Modifier.isPrivate(c.getModifiers())); + } + boolean foundCreate = false; + for (Method m : clazz.getDeclaredMethods()) { + if ("create".equals(m.getName())) { + foundCreate = true; + Assert.assertTrue(clazz.getName() + ": create method must be static", Modifier.isStatic(m.getModifiers())); + } + } + Assert.assertTrue(clazz.getName() + ": all ColumnDef methods have a create method to intern instances", foundCreate); + } + } + } From b049757b767b7e2f667cc07d1bcd28232acfb24d Mon Sep 17 00:00:00 2001 From: Gil Cottle Date: Tue, 6 Jul 2021 12:43:00 -0700 Subject: [PATCH 2/3] rename TableColumnList member variable/className per comments --- .../maxwell/schema/TableColumnList.java | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java b/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java index 9d4e2cbce..f5f2892ed 100644 --- a/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java +++ b/src/main/java/com/zendesk/maxwell/schema/TableColumnList.java @@ -13,26 +13,26 @@ public class TableColumnList implements Iterable { // reduce count of duplicate ArrayLists/Sets for column lists by providing mutability for the class // through references to an internal immutable object that gets interned. This greatly reduces overhead for // table definitions that are duplicated across databases - private InternalImmutableTableColumnList internalState; + private ImmutableColumnList columnList; public TableColumnList(List columns) { - this.internalState = InternalImmutableTableColumnList.create(columns); + this.columnList = ImmutableColumnList.create(columns); } public Iterator iterator() { - return internalState.getColumns().iterator(); + return columnList.getColumns().iterator(); } public List getList() { - return internalState.getColumns(); + return columnList.getColumns(); } public synchronized Set columnNames() { - return internalState.getColumnNames(); + return columnList.getColumnNames(); } public synchronized int indexOf(String name) { - return indexOf(internalState.getColumns(), name); + return indexOf(columnList.getColumns(), name); } private synchronized int indexOf(List columns, String name) { @@ -46,7 +46,7 @@ private synchronized int indexOf(List columns, String name) { } public ColumnDef findByName(String name) { - List columns = internalState.getColumns(); + List columns = columnList.getColumns(); int index = indexOf(columns, name); if ( index == -1 ) return null; @@ -55,45 +55,45 @@ public ColumnDef findByName(String name) { } public synchronized void add(int index, ColumnDef definition) { - List columns = internalState.getColumns(); + List columns = columnList.getColumns(); ArrayList tempList = new ArrayList<>(columns.size() + 1); tempList.addAll(columns); tempList.add(index, definition); - internalState = InternalImmutableTableColumnList.create(tempList); + columnList = ImmutableColumnList.create(tempList); } public synchronized void replace(int index, ColumnDef definition) { - List columns = internalState.getColumns(); + List columns = columnList.getColumns(); ArrayList tempList = new ArrayList<>(columns.size()); tempList.addAll(columns); tempList.set(index, definition); - internalState = InternalImmutableTableColumnList.create(tempList); + columnList = ImmutableColumnList.create(tempList); } public synchronized ColumnDef remove(int index) { - List columns = internalState.getColumns(); + List columns = columnList.getColumns(); ArrayList tempList = new ArrayList<>(columns.size()); tempList.addAll(columns); ColumnDef c = tempList.remove(index); - internalState = InternalImmutableTableColumnList.create(tempList); + columnList = ImmutableColumnList.create(tempList); return c; } public synchronized ColumnDef get(int index) { - return internalState.getColumns().get(index); + return columnList.getColumns().get(index); } public int size() { - return internalState.getColumns().size(); + return columnList.getColumns().size(); } - private static final class InternalImmutableTableColumnList { - private static final Interner INTERNER = Interners.newWeakInterner(); + private static final class ImmutableColumnList { + private static final Interner INTERNER = Interners.newWeakInterner(); private final List columns; private Set columnNames; // not part of equals because it's derived statically - private InternalImmutableTableColumnList(List columns) { + private ImmutableColumnList(List columns) { ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(columns.size()); int i = 0; for (ColumnDef column : columns) { @@ -102,14 +102,14 @@ private InternalImmutableTableColumnList(List columns) { this.columns = builder.build(); } - public static InternalImmutableTableColumnList create(List columns) { - return INTERNER.intern(new InternalImmutableTableColumnList(columns)); + public static ImmutableColumnList create(List columns) { + return INTERNER.intern(new ImmutableColumnList(columns)); } @Override public boolean equals(Object o) { - if (o instanceof InternalImmutableTableColumnList) { - InternalImmutableTableColumnList other = (InternalImmutableTableColumnList) o; + if (o instanceof ImmutableColumnList) { + ImmutableColumnList other = (ImmutableColumnList) o; return columns.equals(other.columns); } return false; From b3ae769b1ca126ec4da8e6db8fc34fed746840f8 Mon Sep 17 00:00:00 2001 From: Gil Cottle Date: Thu, 15 Jul 2021 13:52:24 -0700 Subject: [PATCH 3/3] minor cleanup ColumnDef package remove unused imports StringColumnDef remove getInterner method since it inherits the same from its parent. This was left over from when it started as each class having their own Interner instance. --- .../schema/columndef/JsonColumnDef.java | 2 -- .../schema/columndef/StringColumnDef.java | 18 +++--------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java index 0ffabccd5..2335be183 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/JsonColumnDef.java @@ -6,8 +6,6 @@ import java.io.IOException; -import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.*; - public class JsonColumnDef extends ColumnDef { private JsonColumnDef(String name, String type, short pos) { super(name, type, pos); diff --git a/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java b/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java index 74d5ad1b9..ac2ba47ef 100644 --- a/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java +++ b/src/main/java/com/zendesk/maxwell/schema/columndef/StringColumnDef.java @@ -1,19 +1,12 @@ package com.zendesk.maxwell.schema.columndef; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.nio.charset.UnsupportedCharsetException; -import java.util.Objects; - -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import com.zendesk.maxwell.producer.MaxwellOutputConfig; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Objects; public class StringColumnDef extends ColumnDef { // mutability only allowed after clone and prior to insertion to interner @@ -76,11 +69,6 @@ public String toSQL(Object value) { } } - @Override - protected Interner getInterner() { - return INTERNER; - } - // this could obviously be more complete. private Charset charsetForCharset() { switch(charset.toLowerCase()) {