Skip to content

Commit

Permalink
Merge pull request #10 from zendesk/ben/small_fixes
Browse files Browse the repository at this point in the history
Some small fixes
  • Loading branch information
Ben Osheroff committed Mar 6, 2015
2 parents b8efcf2 + 3af3f21 commit 8cbf185
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
data/*.yaml
config/*.yml
/target
/config.properties
/maxwell.position
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,34 @@
# Maxwell's daemon

This is Maxwell's daemon, an application that processes MySQL binlogs and outputs the changesets as JSON to Kafka.
It's conceptually similar to databus and mypipe, but it can:

- follow schema changes coming down the replication stream
- output the changed rows as JSON (should we support avro? I dunno.)
- recover the binlog position where it left off.

It's intended as an ETL tool and as a source for event-based services.

## quickstart

this will start a Maxwell's daemon for you to play around with:

```
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT on *.* to 'maxwell'@'%';
mysql> GRANT REPLICATION CLIENT ON *.* TO 'maxwell'@'%;
curl -sLo - https://github.com/zendesk/maxwell/releases/download/v0.1/maxwell-0.1.0.tar.gz | tar zxvf -
cd maxwell-0.1.0
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' --producer=stdout
```

## something actually useful

```
cp config.properties.example config.properties
# edit config.properties
bin/maxwell
```

9 changes: 0 additions & 9 deletions bin/console

This file was deleted.

2 changes: 1 addition & 1 deletion bin/maxwell
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ base_dir=$(dirname $0)/..

CLASSPATH=$CLASSPATH:$base_dir/target/classes

for file in target/maxwell-*/*/lib/*.jar
for file in target/maxwell-*/*/lib/*.jar lib/*.jar
do
CLASSPATH=$CLASSPATH:$file
done
Expand Down
38 changes: 0 additions & 38 deletions bin/test_client

This file was deleted.

3 changes: 1 addition & 2 deletions src/main/antlr4/imports/column_definitions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ enum_value: STRING_LITERAL;

charset_def: (character_set | collation)+;
character_set: ((CHARACTER SET) | CHARSET) charset_name;
collation: COLLATE '='? (IDENT | STRING_LITERAL);

collation: COLLATE '='? (IDENT | STRING_LITERAL | QUOTED_IDENT);

nullability: (NOT NULL | NULL);
default_value: DEFAULT (literal | NULL);
Expand Down
2 changes: 2 additions & 0 deletions src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
<directory>${project.basedir}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>bin/*</include>
<include>config.properties.example</include>
<include>README*</include>
<include>LICENSE*</include>
<include>NOTICE*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public void onCompletion(RecordMetadata md, Exception e) {
}
}
}

}
public class MaxwellKafkaProducer extends AbstractProducer {
private final KafkaProducer<byte[], byte[]> kafka;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/zendesk/maxwell/schema/SchemaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private void saveSchema() throws SQLException {

}
}
executeColumnInsert(columnData);
if ( columnData.size() > 0 )
executeColumnInsert(columnData);
}

private void executeColumnInsert(ArrayList<Object> columnData) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public String asJSON(Object value) {
}

private String asString(Object value) {
return enumValues[((Integer) value) - 1];
Integer i = (Integer) value;

if ( i == 0 )
return null;
else
return enumValues[((Integer) value) - 1];
}
}
20 changes: 16 additions & 4 deletions src/main/java/com/zendesk/maxwell/schema/ddl/SchemaChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;

import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CommonTokenStream;
Expand All @@ -20,16 +21,27 @@ public abstract class SchemaChange {

public abstract Schema apply(Schema originalSchema) throws SchemaSyncError;

private static final Set<String> SQL_BLACKLIST = new HashSet<String>();
private static final Set<Pattern> SQL_BLACKLIST = new HashSet<Pattern>();

static {
SQL_BLACKLIST.add("BEGIN");
SQL_BLACKLIST.add("COMMIT");
SQL_BLACKLIST.add(Pattern.compile("^BEGIN", Pattern.CASE_INSENSITIVE));
SQL_BLACKLIST.add(Pattern.compile("^COMMIT", Pattern.CASE_INSENSITIVE));
SQL_BLACKLIST.add(Pattern.compile("^GRANT", Pattern.CASE_INSENSITIVE));
}

private static boolean matchesBlacklist(String sql) {
for ( Pattern p : SQL_BLACKLIST ) {
if ( p.matcher(sql).find() )
return true;
}

return false;
}

public static List<SchemaChange> parse(String currentDB, String sql) {
if ( SQL_BLACKLIST.contains(sql))
if ( matchesBlacklist(sql) ) {
return null;
}

try {
ANTLRInputStream input = new ANTLRInputStream(sql);
Expand Down

0 comments on commit 8cbf185

Please sign in to comment.