Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more logs into MSSQL background thread #35496

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ protected void execSQL(final List<String> sqls) {
}
}

protected void execSQLNoLog(String sql) {
getDslContext().execute(sql);
}

protected void execInContainer(List<String> cmd) {
if (cmd.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,9 @@ protected void setup() {
super.setup();

// Enables cdc on MODELS_SCHEMA.MODELS_STREAM_NAME, giving CDC_ROLE_NAME select access.
final var enableCdcSqlFmt = """
EXEC sys.sp_cdc_enable_table
\t@source_schema = N'%s',
\t@source_name = N'%s',
\t@role_name = N'%s',
\t@supports_net_changes = 0""";
testdb.withCdc()
.with(enableCdcSqlFmt, modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
.with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
.withCdcForTable(modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
.withCdcForTable(randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
.withShortenedCapturePollingInterval();

// Create a test user to be used by the source, with proper permissions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -71,64 +70,159 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod
.initialized();
}

private class MssqlTestDatabaseBackgroundThread extends Thread {
private abstract class AbstractMssqlTestDatabaseBackgroundThread extends Thread {

private volatile boolean stop = false;
protected volatile boolean stop = false;
protected Logger LOGGER = LoggerFactory.getLogger(this.getClass());

protected String formatLogLine(String logLine) {
String retVal = "SGX " + this.getClass().getSimpleName() + " databaseId=" + databaseId + ", containerId=" + containerId + " - " + logLine;
return retVal;
}

public void run() {
LOGGER.info(formatLogLine("Started new database " + getDatabaseName()));
boolean wasRunning = false;
while (!stop) {
try {
String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';";
LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql);
final var r = query(ctx -> ctx.fetch(agentStateSql).get(0));
String agentState = r.getValue(0).toString();
LOGGER.info(formatLogLine("agentState=" + agentState));
if ("Running.".equals(agentState)) {
LOGGER.info(formatLogLine("agent is running. Executing more queries..."));
wasRunning = true;
LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s",
query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0).getValue(0))));
Result<Record> results = query(ctx -> ctx.fetch("SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;"));
LOGGER.info(formatLogLine(String.format("lsn_time_mapping has %d rows: %s", results.size(), results.toString())));
} else if (wasRunning && !"Running.".equals(agentState)) {
LOGGER.info(formatLogLine("agent was running. agentState=" + agentState));
}
innerRun();
sleep(100);
} catch (final Throwable t) {
String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n ");
LOGGER.info(formatLogLine("got exception " + exceptionAsString));
// String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n ");
LOGGER.info(formatLogLine("got exception " + StringUtils.replace(t.getMessage(), "\n", "\\n")));
}
}
}

public abstract void innerRun() throws Exception;

}

private class MssqlTestDatabaseBackgroundThreadAgentState extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';";
LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql);
final var r = query(ctx -> ctx.fetch(agentStateSql).get(0));
String agentState = r.getValue(0).toString();
LOGGER.info(formatLogLine("agentState=" + agentState));
}

}

private class MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying fn_cdc_get_max_lsn"));
LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s",
query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0)
.getValue(0))));
}

}

private class MssqlTestDatabaseBackgroundThreadLsnTimeMapping extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying lsn_time_mapping"));
Result<Record> results = query(ctx -> ctx.fetch(
"SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;"));
LOGGER.info(formatLogLine(
String.format("lsn_time_mapping has %d rows: %s", results.size(),
results.toString())));
}

}

private class MssqlTestDatabaseBackgroundThreadEnableInternalTable extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("enabling CDC for internal table"));
withCdcForTable(getInternalSchemaName(),
getInternalTableName(), null);
stop = true;
LOGGER.info(formatLogLine("enabled CDC for internal table"));
}

}

private final MssqlTestDatabaseBackgroundThread bgThread;
private class MssqlTestDatabaseBackgroundThreadQueryChangeTables extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying cdc.change_tables"));
Result<Record> results = query(ctx -> ctx.fetch("""
SELECT OBJECT_SCHEMA_NAME(source_object_id, DB_ID('%s')),
OBJECT_NAME(source_object_id, DB_ID('%s')),
capture_instance,
object_id,
start_lsn FROM cdc.change_tables""".formatted(getDatabaseName(), getDatabaseName())));
LOGGER.info(formatLogLine(
String.format("change_tables has %d rows: %s", results.size(),
results.toString())));
}

}

private class MssqlTestDatabaseBackgroundThreadQueryInternalTable extends AbstractMssqlTestDatabaseBackgroundThread {

@Override
public void innerRun() throws Exception {
LOGGER.info(formatLogLine("querying internal table"));
Result<Record> results = query(ctx -> ctx.fetch("SELECT* FROM %s.%s".formatted(getInternalSchemaName(), getInternalTableName())));
LOGGER.info(formatLogLine(
String.format("internal table has %d rows: %s", results.size(),
results.toString())));
}

}

private final AbstractMssqlTestDatabaseBackgroundThread bgThreads[];

public MsSQLTestDatabase(final MSSQLServerContainer<?> container) {
super(container);
bgThread = new MssqlTestDatabaseBackgroundThread();
bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] {
new MssqlTestDatabaseBackgroundThreadAgentState(),
new MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn(),
new MssqlTestDatabaseBackgroundThreadLsnTimeMapping(),
new MssqlTestDatabaseBackgroundThreadEnableInternalTable(),
new MssqlTestDatabaseBackgroundThreadQueryChangeTables(),
new MssqlTestDatabaseBackgroundThreadQueryInternalTable()};
LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName());
}

@Override
public void initializedPostHook() {
bgThread.start();
for (var bgThread : bgThreads) {
bgThread.start();
}
}

public MsSQLTestDatabase withCdc() {
LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId);
MsSQLTestDatabase retVal = with("EXEC sys.sp_cdc_enable_db;");
with("EXEC sys.sp_cdc_enable_db;");
LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId);
try {
LOGGER.info("Sleeping");
Thread.sleep(60_000);
LOGGER.info("Done sleeping");
LOGGER.info("sleeping");
Thread.sleep(300_000);
LOGGER.info("resuming");
} catch (InterruptedException e) {

}
return retVal;
return this;
}

public synchronized MsSQLTestDatabase withCdcForTable(String schemaName, String tableName, String roleName) {
final var enableCdcSqlFmt = """
EXEC sys.sp_cdc_enable_table
\t@source_schema = N'%s',
\t@source_name = N'%s',
\t@role_name = %s,
\t@supports_net_changes = 0""";
String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName);
return with(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName));
}

public MsSQLTestDatabase withoutCdc() {
Expand Down Expand Up @@ -216,13 +310,18 @@ public String getJdbcUrl() {
@Override
protected List<List<String>> inContainerBootstrapCmd() {
return List.of(
mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(List.of(
mssqlCmdInMasterDb(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmdInMasterDb(List.of(
String.format("USE %s", getDatabaseName()),
String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()),
String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()),
String.format("CREATE USER %s FOR LOGIN %s WITH DEFAULT_SCHEMA = [dbo]", getUserName(), getUserName()),
String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName()))));
String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName()))),
mssqlCmd(List.of(String.format("CREATE SCHEMA %s", getInternalSchemaName()))),
mssqlCmd(List.of(String.format("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY)", getInternalSchemaName(), getInternalTableName()))),
mssqlCmd(List.of(String.format("GRANT ALL ON SCHEMA :: %s TO PUBLIC", getInternalSchemaName()))),
mssqlCmd(List.of(String.format("GRANT ALL ON %s.%s TO PUBLIC", getInternalSchemaName(), getInternalTableName()))));

}

/**
Expand All @@ -236,17 +335,26 @@ protected List<String> inContainerUndoBootstrapCmd() {
}

public void dropDatabaseAndUser() {
execInContainer(mssqlCmd(List.of(
execInContainer(mssqlCmdInMasterDb(List.of(
String.format("USE master"),
String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()),
String.format("DROP DATABASE %s", getDatabaseName()))));
}

public List<String> mssqlCmd(final List<String> sql) {
return List.of("/opt/mssql-tools/bin/sqlcmd",
public List<String> mssqlCmdInMasterDb(final List<String> sql) {
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
"-U", getContainer().getUsername(),
"-P", getContainer().getPassword(),
"-Q", StringUtils.join("; "),
"-Q", StringUtils.join(sql, "; "),
"-d", "master",
"-b", "-e");
}

public List<String> mssqlCmd(final List<String> sql) {
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
"-U", getUserName(),
"-P", getPassword(),
"-Q", StringUtils.join(sql, "; "),
"-b", "-e");
}

Expand All @@ -260,6 +368,14 @@ public SQLDialect getSqlDialect() {
return SQLDialect.DEFAULT;
}

private String getInternalTableName() {
return withNamespace("internal_table_");
}

private String getInternalSchemaName() {
return withNamespace("internal_schema_");
}

public static enum CertificateKey {

CA(true),
Expand Down Expand Up @@ -299,7 +415,9 @@ public synchronized String getCertificate(final CertificateKey certificateKey) {
}

public void close() {
bgThread.stop = true;
for (var bgThread : bgThreads) {
bgThread.stop = true;
}
super.close();
}

Expand Down
Loading