Skip to content

Commit

Permalink
Merge pull request #1719 from redcape/columndef-immutable
Browse files Browse the repository at this point in the history
Intern ColumnDefs to reduce memory by more than half for multi-tenant DB use-cases
  • Loading branch information
osheroff authored Jul 29, 2021
2 parents f99e3f2 + b3ae769 commit f3616ac
Show file tree
Hide file tree
Showing 26 changed files with 629 additions and 175 deletions.
70 changes: 35 additions & 35 deletions src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.zendesk.maxwell.util.ConnectionPool;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.text.StrTokenizer;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -635,27 +634,29 @@ public Position getPosition() {
private void fixUnsignedColumns(Schema recaptured) throws SQLException, InvalidSchemaError {
int unsignedDiffs = 0;

for ( Pair<ColumnDef, ColumnDef> pair : schema.matchColumns(recaptured) ) {
ColumnDef cA = pair.getLeft();
ColumnDef cB = pair.getRight();
for ( Pair<Schema.FullColumnDef, Schema.FullColumnDef> pair : schema.matchColumns(recaptured) ) {
Table schemaTable = pair.getLeft().getTable();
ColumnDef schemaCol = pair.getLeft().getColumnDef();
ColumnDef recapturedCol = pair.getRight().getColumnDef();

if (cA instanceof IntColumnDef) {
if (cB != null && cB instanceof IntColumnDef) {
if (((IntColumnDef) cA).isSigned() && !((IntColumnDef) cB).isSigned()) {
((IntColumnDef) cA).setSigned(false);
if (schemaCol instanceof IntColumnDef) {
if (recapturedCol != null && recapturedCol instanceof IntColumnDef) {
if (((IntColumnDef) schemaCol).isSigned() && !((IntColumnDef) recapturedCol).isSigned()) {
schemaTable.replaceColumn(schemaCol.getPos(), ((IntColumnDef) schemaCol).withSigned(false));
unsignedDiffs++;
}
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + schemaCol.getName() +
". You may want to recapture your schema");
}
} else if (cA instanceof BigIntColumnDef) {
if (cB != null && cB instanceof BigIntColumnDef) {
if (((BigIntColumnDef) cA).isSigned() && !((BigIntColumnDef) cB).isSigned())
((BigIntColumnDef) cA).setSigned(false);
} else if (schemaCol instanceof BigIntColumnDef) {
if (recapturedCol != null && recapturedCol instanceof BigIntColumnDef) {
if (((BigIntColumnDef) schemaCol).isSigned() && !((BigIntColumnDef) recapturedCol).isSigned()) {
schemaTable.replaceColumn(schemaCol.getPos(), ((BigIntColumnDef) schemaCol).withSigned(false));
}
unsignedDiffs++;
} else {
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + cA.getName() +
LOGGER.warn("warning: Couldn't check for unsigned integer bug on column " + schemaCol.getName() +
". You may want to recapture your schema");
}
}
Expand All @@ -675,43 +676,42 @@ private void fixUnsignedColumns(Schema recaptured) throws SQLException, InvalidS
}
}

private void fixColumnCases(Schema recaptured) throws SQLException {
private void fixColumnCases(Schema recaptured) throws InvalidSchemaError {
int caseDiffs = 0;

for ( Pair<ColumnDef, ColumnDef> pair : schema.matchColumns(recaptured) ) {
ColumnDef cA = pair.getLeft();
ColumnDef cB = pair.getRight();
for (Pair<Schema.FullColumnDef, Schema.FullColumnDef> pair : schema.matchColumns(recaptured)) {
Table schemaTable = pair.getLeft().getTable();
ColumnDef schemaCol = pair.getLeft().getColumnDef();
ColumnDef recapturedCol = pair.getRight().getColumnDef();

if ( !cA.getName().equals(cB.getName()) ) {
LOGGER.info("correcting column case of `" + cA.getName() + "` to `" + cB.getName() + "`. Will save a full schema snapshot after the new DDL update is processed.");
if (!schemaCol.getName().equals(recapturedCol.getName())) {
LOGGER.info("correcting column case of `" + schemaCol.getName() + "` to `" + recapturedCol.getName() + "`. Will save a full schema snapshot after the new DDL update is processed.");
caseDiffs++;
cA.setName(cB.getName());
schemaTable.replaceColumn(schemaCol.getPos(), schemaCol.withName(recapturedCol.getName()));
}
}

if ( caseDiffs > 0 )
this.shouldSnapshotNextSchema = true;
}

private void fixColumnLength(Schema recaptured) throws SQLException {
private void fixColumnLength(Schema recaptured) throws InvalidSchemaError {
int colLengthDiffs = 0;

for ( Pair<ColumnDef, ColumnDef> pair : schema.matchColumns(recaptured) ) {
ColumnDef cA = pair.getLeft();
ColumnDef cB = pair.getRight();
for ( Pair<Schema.FullColumnDef, Schema.FullColumnDef> pair : schema.matchColumns(recaptured) ) {
Table schemaTable = pair.getLeft().getTable();
ColumnDef schemaCol = pair.getLeft().getColumnDef();
ColumnDef recapturedCol = pair.getRight().getColumnDef();

if (cA instanceof ColumnDefWithLength) {
if (cB != null && cB instanceof ColumnDefWithLength) {
long aColLength = ((ColumnDefWithLength) cA).getColumnLength();
long bColLength = ((ColumnDefWithLength) cB).getColumnLength();
if (schemaCol instanceof ColumnDefWithLength) {
if (recapturedCol != null && recapturedCol instanceof ColumnDefWithLength) {
long aColLength = ((ColumnDefWithLength) schemaCol).getColumnLength();
long bColLength = ((ColumnDefWithLength) recapturedCol).getColumnLength();

if ( aColLength != bColLength ) {
colLengthDiffs++;
LOGGER.info("correcting column length of `" + cA.getName() + "` to " + bColLength + ". Will save a full schema snapshot after the new DDL update is processed.");
((ColumnDefWithLength) cA).setColumnLength(bColLength);
LOGGER.info("correcting column length of `" + schemaCol.getName() + "` to " + bColLength + ". Will save a full schema snapshot after the new DDL update is processed.");
schemaTable.replaceColumn(schemaCol.getPos(), ((ColumnDefWithLength) schemaCol).withColumnLength(bColLength));
}
} else {
LOGGER.warn("warning: Couldn't check for column length on column " + cA.getName() +
LOGGER.warn("warning: Couldn't check for column length on column " + schemaCol.getName() +
". You may want to recapture your schema");
}
}
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/com/zendesk/maxwell/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public CaseSensitivity getCaseSensitivity() {
return sensitivity;
};

public List<Pair<ColumnDef, ColumnDef>> matchColumns(Schema thatSchema) {
ArrayList<Pair<ColumnDef, ColumnDef>> list = new ArrayList<>();
public List<Pair<FullColumnDef, FullColumnDef>> matchColumns(Schema thatSchema) {
ArrayList<Pair<FullColumnDef, FullColumnDef>> list = new ArrayList<>();

for ( Database thisDatabase : this.getDatabases() ) {
Database thatDatabase = thatSchema.findDatabase(thisDatabase.getName());
Expand All @@ -122,10 +122,37 @@ public List<Pair<ColumnDef, ColumnDef>> matchColumns(Schema thatSchema) {
for ( ColumnDef thisColumn : thisTable.getColumnList() ) {
ColumnDef thatColumn = thatTable.findColumn(thisColumn.getName());
if ( thatColumn != null )
list.add(Pair.of(thisColumn, thatColumn));
list.add(Pair.of(
new FullColumnDef(thisDatabase, thisTable, thisColumn),
new FullColumnDef(thatDatabase, thatTable, thatColumn)
));
}
}
}
return list;
}

public static class FullColumnDef {
private final Database db;
private final Table table;
private final ColumnDef columnDef;

public FullColumnDef(Database db, Table table, ColumnDef columnDef) {
this.db = db;
this.table = table;
this.columnDef = columnDef;
}

public Database getDb() {
return db;
}

public Table getTable() {
return table;
}

public ColumnDef getColumnDef() {
return columnDef;
}
}
}
16 changes: 10 additions & 6 deletions src/main/java/com/zendesk/maxwell/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void diffColumnList(List<String> diffs, Table a, Table b, String nameA,
EnumeratedColumnDef enumA, enumB;
enumA = (EnumeratedColumnDef) column;
enumB = (EnumeratedColumnDef) other;
if ( !Arrays.deepEquals(enumA.getEnumValues(), enumB.getEnumValues()) ) {
if ( !enumA.getEnumValues().equals(enumB.getEnumValues()) ) {
diffs.add(colName + "has an enum value mismatch, "
+ StringUtils.join(enumA.getEnumValues(), ",")
+ " vs "
Expand Down Expand Up @@ -223,8 +223,10 @@ public void diff(List<String> diffs, Table other, String nameA, String nameB) {
}

public void setDefaultColumnCharsets() {
String newCharset = this.getCharset();
for ( StringColumnDef c : getStringColumns() ) {
c.setDefaultCharset(this.getCharset());
int index = c.getPos();
columns.replace(index, c.withDefaultCharset(newCharset));
}
}

Expand All @@ -243,10 +245,12 @@ public void removeColumn(int idx) {
}

public void renameColumn(int idx, String name) throws InvalidSchemaError {
ColumnDef column = columns.get(idx).clone();
column.setName(name);
columns.remove(idx);
columns.add(idx, column);
ColumnDef column = columns.get(idx).withName(name);
columns.replace(idx, column);
}

public void replaceColumn(int idx, ColumnDef definition) throws InvalidSchemaError {
columns.replace(idx, definition);
}

public void changeColumn(int idx, ColumnPosition position, ColumnDef definition, List<DeferredPositionUpdate> deferred) throws InvalidSchemaError {
Expand Down
116 changes: 87 additions & 29 deletions src/main/java/com/zendesk/maxwell/schema/TableColumnList.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,40 @@

import java.util.*;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.zendesk.maxwell.schema.columndef.ColumnDef;


public class TableColumnList implements Iterable<ColumnDef> {
private final List<ColumnDef> columns;
private Set<String> columnNames;
// reduce count of duplicate ArrayLists/Sets for column lists by providing mutability for the class
// through references to an internal immutable object that gets interned. This greatly reduces overhead for
// table definitions that are duplicated across databases
private ImmutableColumnList columnList;

public TableColumnList(List<ColumnDef> columns) {
this.columns = columns;
renumberColumns();
this.columnList = ImmutableColumnList.create(columns);
}

public Iterator<ColumnDef> iterator() {
return columns.iterator();
return columnList.getColumns().iterator();
}

public List<ColumnDef> getList() {
return columns;
return columnList.getColumns();
}

public synchronized Set<String> columnNames() {
if ( columnNames == null ) {
columnNames = new HashSet<>();
for ( ColumnDef cf : columns )
columnNames.add(cf.getName().toLowerCase().intern());
}
return columnNames;
return columnList.getColumnNames();
}

public synchronized int indexOf(String name) {
return indexOf(columnList.getColumns(), name);
}

private synchronized int indexOf(List<ColumnDef> columns, String name) {
String lcName = name.toLowerCase();

for ( int i = 0 ; i < columns.size(); i++ ) {
Expand All @@ -42,43 +46,97 @@ public synchronized int indexOf(String name) {
}

public ColumnDef findByName(String name) {
int index = indexOf(name);
List<ColumnDef> columns = columnList.getColumns();
int index = indexOf(columns, name);
if ( index == -1 )
return null;
else
return columns.get(index);
}

public synchronized void add(int index, ColumnDef definition) {
columns.add(index, definition);

if ( columnNames != null )
columnNames.add(definition.getName().toLowerCase());
List<ColumnDef> columns = columnList.getColumns();
ArrayList<ColumnDef> tempList = new ArrayList<>(columns.size() + 1);
tempList.addAll(columns);
tempList.add(index, definition);
columnList = ImmutableColumnList.create(tempList);
}

renumberColumns();
public synchronized void replace(int index, ColumnDef definition) {
List<ColumnDef> columns = columnList.getColumns();
ArrayList<ColumnDef> tempList = new ArrayList<>(columns.size());
tempList.addAll(columns);
tempList.set(index, definition);
columnList = ImmutableColumnList.create(tempList);
}

public synchronized ColumnDef remove(int index) {
ColumnDef c = columns.remove(index);

if ( columnNames != null )
columnNames.remove(c.getName().toLowerCase());
renumberColumns();
List<ColumnDef> columns = columnList.getColumns();
ArrayList<ColumnDef> tempList = new ArrayList<>(columns.size());
tempList.addAll(columns);
ColumnDef c = tempList.remove(index);
columnList = ImmutableColumnList.create(tempList);
return c;
}

public synchronized ColumnDef get(int index) {
return columns.get(index);
return columnList.getColumns().get(index);
}

public int size() {
return columns.size();
return columnList.getColumns().size();
}

private void renumberColumns() {
short i = 0 ;
for ( ColumnDef c : columns ) {
c.setPos(i++);
private static final class ImmutableColumnList {
private static final Interner<ImmutableColumnList> INTERNER = Interners.newWeakInterner();

private final List<ColumnDef> columns;
private Set<String> columnNames; // not part of equals because it's derived statically

private ImmutableColumnList(List<ColumnDef> columns) {
ImmutableList.Builder<ColumnDef> builder = ImmutableList.builderWithExpectedSize(columns.size());
int i = 0;
for (ColumnDef column : columns) {
builder.add(column.withPos((short) i++));
}
this.columns = builder.build();
}

public static ImmutableColumnList create(List<ColumnDef> columns) {
return INTERNER.intern(new ImmutableColumnList(columns));
}

@Override
public boolean equals(Object o) {
if (o instanceof ImmutableColumnList) {
ImmutableColumnList other = (ImmutableColumnList) o;
return columns.equals(other.columns);
}
return false;
}

@Override
public int hashCode() {
return columns.hashCode();
}

public List<ColumnDef> getColumns() {
return columns;
}

public Set<String> getColumnNames() {
if ( columnNames == null ) {
columnNames = generateColumnNames();
}
return columnNames;
}

private Set<String> generateColumnNames() {
ImmutableSet.Builder<String> setBuilder = ImmutableSet.builderWithExpectedSize(columns.size());
for ( ColumnDef cf : columns ) {
setBuilder.add(cf.getName().toLowerCase().intern());
}
return setBuilder.build();
}
}
}
Expand Down
Loading

0 comments on commit f3616ac

Please sign in to comment.