Skip to content

Commit

Permalink
add more logs into MSSQL background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Feb 24, 2024
1 parent 459f923 commit 5bae05a
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ protected void execSQL(final List<String> sqls) {
}
}

protected void execSQLNoLog(String sql) {
getDslContext().execute(sql);
}

protected void execInContainer(List<String> cmd) {
if (cmd.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,9 @@ protected void setup() {
super.setup();

// Enables cdc on MODELS_SCHEMA.MODELS_STREAM_NAME, giving CDC_ROLE_NAME select access.
final var enableCdcSqlFmt = """
EXEC sys.sp_cdc_enable_table
\t@source_schema = N'%s',
\t@source_name = N'%s',
\t@role_name = N'%s',
\t@supports_net_changes = 0""";
testdb.withCdc()
.with(enableCdcSqlFmt, modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
.with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
.withCdcForTable(modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
.withCdcForTable(randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
.withShortenedCapturePollingInterval();

// Create a test user to be used by the source, with proper permissions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -71,64 +70,159 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod
.initialized();
}

private class MssqlTestDatabaseBackgroundThread extends Thread {
private abstract class AbstractMssqlTestDatabaseBackgroundThread extends Thread {

private volatile boolean stop = false;
protected volatile boolean stop = false;
protected Logger LOGGER = LoggerFactory.getLogger(this.getClass());

protected String formatLogLine(String logLine) {
String retVal = "SGX " + this.getClass().getSimpleName() + " databaseId=" + databaseId + ", containerId=" + containerId + " - " + logLine;
return retVal;
}

public void run() {
LOGGER.info(formatLogLine("Started new database " + getDatabaseName()));
boolean wasRunning = false;
while (!stop) {
try {
String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';";
LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql);
final var r = query(ctx -> ctx.fetch(agentStateSql).get(0));
String agentState = r.getValue(0).toString();
LOGGER.info(formatLogLine("agentState=" + agentState));
if ("Running.".equals(agentState)) {
LOGGER.info(formatLogLine("agent is running. Executing more queries..."));
wasRunning = true;
LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s",
query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0).getValue(0))));
Result<Record> results = query(ctx -> ctx.fetch("SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;"));
LOGGER.info(formatLogLine(String.format("lsn_time_mapping has %d rows: %s", results.size(), results.toString())));
} else if (wasRunning && !"Running.".equals(agentState)) {
LOGGER.info(formatLogLine("agent was running. agentState=" + agentState));
}
innerRun();
sleep(100);
} catch (final Throwable t) {
String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n ");
LOGGER.info(formatLogLine("got exception " + exceptionAsString));
// String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n ");
LOGGER.info(formatLogLine("got exception " + StringUtils.replace(t.getMessage(), "\n", "\\n")));
}
}
}

public abstract void innerRun() throws Exception;

}

private class MssqlTestDatabaseBackgroundThreadAgentState extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';";
LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql);
final var r = query(ctx -> ctx.fetch(agentStateSql).get(0));
String agentState = r.getValue(0).toString();
LOGGER.info(formatLogLine("agentState=" + agentState));
}

}

private class MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying fn_cdc_get_max_lsn"));
LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s",
query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0)
.getValue(0))));
}

}

private class MssqlTestDatabaseBackgroundThreadLsnTimeMapping extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying lsn_time_mapping"));
Result<Record> results = query(ctx -> ctx.fetch(
"SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;"));
LOGGER.info(formatLogLine(
String.format("lsn_time_mapping has %d rows: %s", results.size(),
results.toString())));
}

}

private class MssqlTestDatabaseBackgroundThreadEnableInternalTable extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("enabling CDC for internal table"));
withCdcForTable(getInternalSchemaName(),
getInternalTableName(), null);
stop = true;
LOGGER.info(formatLogLine("enabled CDC for internal table"));
}

}

private final MssqlTestDatabaseBackgroundThread bgThread;
private class MssqlTestDatabaseBackgroundThreadQueryChangeTables extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying cdc.change_tables"));
Result<Record> results = query(ctx -> ctx.fetch("""
SELECT OBJECT_SCHEMA_NAME(source_object_id, DB_ID('%s')),
OBJECT_NAME(source_object_id, DB_ID('%s')),
capture_instance,
object_id,
start_lsn FROM cdc.change_tables""".formatted(getDatabaseName(), getDatabaseName())));
LOGGER.info(formatLogLine(
String.format("change_tables has %d rows: %s", results.size(),
results.toString())));
}

}

private class MssqlTestDatabaseBackgroundThreadQueryInternalTable extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying internal table"));
Result<Record> results = query(ctx -> ctx.fetch("SELECT* FROM %s.%s".formatted(getInternalSchemaName(), getInternalTableName())));
LOGGER.info(formatLogLine(
String.format("internal table has %d rows: %s", results.size(),
results.toString())));
}

}

private final AbstractMssqlTestDatabaseBackgroundThread bgThreads[];

public MsSQLTestDatabase(final MSSQLServerContainer<?> container) {
super(container);
bgThread = new MssqlTestDatabaseBackgroundThread();
bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] {
new MssqlTestDatabaseBackgroundThreadAgentState(),
new MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn(),
new MssqlTestDatabaseBackgroundThreadLsnTimeMapping(),
new MssqlTestDatabaseBackgroundThreadEnableInternalTable(),
new MssqlTestDatabaseBackgroundThreadQueryChangeTables(),
new MssqlTestDatabaseBackgroundThreadQueryInternalTable()};
LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName());
}

@Override
public void initializedPostHook() {
bgThread.start();
for (var bgThread : bgThreads) {
bgThread.start();
}
}

public MsSQLTestDatabase withCdc() {
LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId);
MsSQLTestDatabase retVal = with("EXEC sys.sp_cdc_enable_db;");
with("EXEC sys.sp_cdc_enable_db;");
LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId);
try {
LOGGER.info("Sleeping");
Thread.sleep(60_000);
LOGGER.info("Done sleeping");
LOGGER.info("sleeping");
Thread.sleep(300_000);
LOGGER.info("resuming");
} catch (InterruptedException e) {

}
return retVal;
return this;
}

public synchronized MsSQLTestDatabase withCdcForTable(String schemaName, String tableName, String roleName) {
final var enableCdcSqlFmt = """
EXEC sys.sp_cdc_enable_table
\t@source_schema = N'%s',
\t@source_name = N'%s',
\t@role_name = %s,
\t@supports_net_changes = 0""";
String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName);
return with(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName));
}

public MsSQLTestDatabase withoutCdc() {
Expand Down Expand Up @@ -216,13 +310,18 @@ public String getJdbcUrl() {
@Override
protected List<List<String>> inContainerBootstrapCmd() {
return List.of(
mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(List.of(
mssqlCmdInMasterDb(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmdInMasterDb(List.of(
String.format("USE %s", getDatabaseName()),
String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()),
String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()),
String.format("CREATE USER %s FOR LOGIN %s WITH DEFAULT_SCHEMA = [dbo]", getUserName(), getUserName()),
String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName()))));
String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName()))),
mssqlCmd(List.of(String.format("CREATE SCHEMA %s", getInternalSchemaName()))),
mssqlCmd(List.of(String.format("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY)", getInternalSchemaName(), getInternalTableName()))),
mssqlCmd(List.of(String.format("GRANT ALL ON SCHEMA :: %s TO PUBLIC", getInternalSchemaName()))),
mssqlCmd(List.of(String.format("GRANT ALL ON %s.%s TO PUBLIC", getInternalSchemaName(), getInternalTableName()))));

}

/**
Expand All @@ -236,17 +335,26 @@ protected List<String> inContainerUndoBootstrapCmd() {
}

public void dropDatabaseAndUser() {
execInContainer(mssqlCmd(List.of(
execInContainer(mssqlCmdInMasterDb(List.of(
String.format("USE master"),
String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()),
String.format("DROP DATABASE %s", getDatabaseName()))));
}

public List<String> mssqlCmd(final List<String> sql) {
return List.of("/opt/mssql-tools/bin/sqlcmd",
public List<String> mssqlCmdInMasterDb(final List<String> sql) {
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
"-U", getContainer().getUsername(),
"-P", getContainer().getPassword(),
"-Q", StringUtils.join("; "),
"-Q", StringUtils.join(sql, "; "),
"-d", "master",
"-b", "-e");
}

public List<String> mssqlCmd(final List<String> sql) {
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
"-U", getUserName(),
"-P", getPassword(),
"-Q", StringUtils.join(sql, "; "),
"-b", "-e");
}

Expand All @@ -260,6 +368,14 @@ public SQLDialect getSqlDialect() {
return SQLDialect.DEFAULT;
}

private String getInternalTableName() {
return withNamespace("internal_table_");
}

private String getInternalSchemaName() {
return withNamespace("internal_schema_");
}

public static enum CertificateKey {

CA(true),
Expand Down Expand Up @@ -299,7 +415,9 @@ public synchronized String getCertificate(final CertificateKey certificateKey) {
}

public void close() {
bgThread.stop = true;
for (var bgThread : bgThreads) {
bgThread.stop = true;
}
super.close();
}

Expand Down

0 comments on commit 5bae05a

Please sign in to comment.