diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java index 82b1d533807e..f29375ae908d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java @@ -211,6 +211,10 @@ protected void execSQL(final List sqls) { } } + protected void execSQLNoLog(String sql) { + getDslContext().execute(sql); + } + protected void execInContainer(List cmd) { if (cmd.isEmpty()) { return; diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index db2ce275734b..48236462c1ef 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -136,15 +136,9 @@ protected void setup() { super.setup(); // Enables cdc on MODELS_SCHEMA.MODELS_STREAM_NAME, giving CDC_ROLE_NAME select access. - final var enableCdcSqlFmt = """ - EXEC sys.sp_cdc_enable_table - \t@source_schema = N'%s', - \t@source_name = N'%s', - \t@role_name = N'%s', - \t@supports_net_changes = 0"""; testdb.withCdc() - .with(enableCdcSqlFmt, modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME) - .with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME) + .withCdcForTable(modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME) + .withCdcForTable(randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME) .withShortenedCapturePollingInterval(); // Create a test user to be used by the source, with proper permissions. diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java index 8514efb63b19..b3e51aafc985 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java @@ -11,14 +11,13 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.jooq.Record; import org.jooq.Result; import org.jooq.SQLDialect; @@ -71,64 +70,159 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod .initialized(); } - private class MssqlTestDatabaseBackgroundThread extends Thread { + private abstract class AbstractMssqlTestDatabaseBackgroundThread extends Thread { - private volatile boolean stop = false; + protected volatile boolean stop = false; + protected Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + + protected String formatLogLine(String logLine) { + String retVal = "SGX " + this.getClass().getSimpleName() + " databaseId=" + databaseId + ", containerId=" + containerId + " - " + logLine; + return retVal; + } public void run() { - LOGGER.info(formatLogLine("Started new database " + getDatabaseName())); - boolean wasRunning = false; while (!stop) { try { - String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';"; - LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql); - final var r = query(ctx -> ctx.fetch(agentStateSql).get(0)); - String agentState = r.getValue(0).toString(); - LOGGER.info(formatLogLine("agentState=" + agentState)); - if ("Running.".equals(agentState)) { - LOGGER.info(formatLogLine("agent is running. Executing more queries...")); - wasRunning = true; - LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s", - query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0).getValue(0)))); - Result results = query(ctx -> ctx.fetch("SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;")); - LOGGER.info(formatLogLine(String.format("lsn_time_mapping has %d rows: %s", results.size(), results.toString()))); - } else if (wasRunning && !"Running.".equals(agentState)) { - LOGGER.info(formatLogLine("agent was running. agentState=" + agentState)); - } + innerRun(); + sleep(100); } catch (final Throwable t) { - String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n "); - LOGGER.info(formatLogLine("got exception " + exceptionAsString)); + // String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n "); + LOGGER.info(formatLogLine("got exception " + StringUtils.replace(t.getMessage(), "\n", "\\n"))); } } } + public abstract void innerRun() throws Exception; + + } + + private class MssqlTestDatabaseBackgroundThreadAgentState extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + String agentStateSql = "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';"; + LOGGER.info(formatLogLine("executing agentStateSql {}"), agentStateSql); + final var r = query(ctx -> ctx.fetch(agentStateSql).get(0)); + String agentState = r.getValue(0).toString(); + LOGGER.info(formatLogLine("agentState=" + agentState)); + } + + } + + private class MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + LOGGER.info(formatLogLine("querying fn_cdc_get_max_lsn")); + LOGGER.info(formatLogLine(String.format("sys.fn_cdc_get_max_lsn returned %s", + query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;")).get(0) + .getValue(0)))); + } + + } + + private class MssqlTestDatabaseBackgroundThreadLsnTimeMapping extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + LOGGER.info(formatLogLine("querying lsn_time_mapping")); + Result results = query(ctx -> ctx.fetch( + "SELECT start_lsn, tran_begin_time, tran_end_time, tran_id FROM cdc.lsn_time_mapping;")); + LOGGER.info(formatLogLine( + String.format("lsn_time_mapping has %d rows: %s", results.size(), + results.toString()))); + } + + } + + private class MssqlTestDatabaseBackgroundThreadEnableInternalTable extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + LOGGER.info(formatLogLine("enabling CDC for internal table")); + withCdcForTable(getInternalSchemaName(), + getInternalTableName(), null); + stop = true; + LOGGER.info(formatLogLine("enabled CDC for internal table")); + } + } - private final MssqlTestDatabaseBackgroundThread bgThread; + private class MssqlTestDatabaseBackgroundThreadQueryChangeTables extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + LOGGER.info(formatLogLine("querying cdc.change_tables")); + Result results = query(ctx -> ctx.fetch(""" + SELECT OBJECT_SCHEMA_NAME(source_object_id, DB_ID('%s')), + OBJECT_NAME(source_object_id, DB_ID('%s')), + capture_instance, + object_id, + start_lsn FROM cdc.change_tables""".formatted(getDatabaseName(), getDatabaseName()))); + LOGGER.info(formatLogLine( + String.format("change_tables has %d rows: %s", results.size(), + results.toString()))); + } + + } + + private class MssqlTestDatabaseBackgroundThreadQueryInternalTable extends AbstractMssqlTestDatabaseBackgroundThread { + + @Override + public void innerRun() throws Exception { + LOGGER.info(formatLogLine("querying internal table")); + Result results = query(ctx -> ctx.fetch("SELECT* FROM %s.%s".formatted(getInternalSchemaName(), getInternalTableName()))); + LOGGER.info(formatLogLine( + String.format("internal table has %d rows: %s", results.size(), + results.toString()))); + } + + } + + private final AbstractMssqlTestDatabaseBackgroundThread bgThreads[]; public MsSQLTestDatabase(final MSSQLServerContainer container) { super(container); - bgThread = new MssqlTestDatabaseBackgroundThread(); + bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] { + new MssqlTestDatabaseBackgroundThreadAgentState(), + new MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn(), + new MssqlTestDatabaseBackgroundThreadLsnTimeMapping(), + new MssqlTestDatabaseBackgroundThreadEnableInternalTable(), + new MssqlTestDatabaseBackgroundThreadQueryChangeTables(), + new MssqlTestDatabaseBackgroundThreadQueryInternalTable()}; LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName()); } @Override public void initializedPostHook() { - bgThread.start(); + for (var bgThread : bgThreads) { + bgThread.start(); + } } public MsSQLTestDatabase withCdc() { LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId); - MsSQLTestDatabase retVal = with("EXEC sys.sp_cdc_enable_db;"); + with("EXEC sys.sp_cdc_enable_db;"); LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId); try { - LOGGER.info("Sleeping"); - Thread.sleep(60_000); - LOGGER.info("Done sleeping"); + LOGGER.info("sleeping"); + Thread.sleep(300_000); + LOGGER.info("resuming"); } catch (InterruptedException e) { } - return retVal; + return this; + } + + public synchronized MsSQLTestDatabase withCdcForTable(String schemaName, String tableName, String roleName) { + final var enableCdcSqlFmt = """ + EXEC sys.sp_cdc_enable_table + \t@source_schema = N'%s', + \t@source_name = N'%s', + \t@role_name = %s, + \t@supports_net_changes = 0"""; + String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName); + return with(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName)); } public MsSQLTestDatabase withoutCdc() { @@ -216,13 +310,18 @@ public String getJdbcUrl() { @Override protected List> inContainerBootstrapCmd() { return List.of( - mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))), - mssqlCmd(List.of( + mssqlCmdInMasterDb(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))), + mssqlCmdInMasterDb(List.of( String.format("USE %s", getDatabaseName()), String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()), String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()), String.format("CREATE USER %s FOR LOGIN %s WITH DEFAULT_SCHEMA = [dbo]", getUserName(), getUserName()), - String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName())))); + String.format("ALTER ROLE [db_owner] ADD MEMBER %s", getUserName()))), + mssqlCmd(List.of(String.format("CREATE SCHEMA %s", getInternalSchemaName()))), + mssqlCmd(List.of(String.format("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY)", getInternalSchemaName(), getInternalTableName()))), + mssqlCmd(List.of(String.format("GRANT ALL ON SCHEMA :: %s TO PUBLIC", getInternalSchemaName()))), + mssqlCmd(List.of(String.format("GRANT ALL ON %s.%s TO PUBLIC", getInternalSchemaName(), getInternalTableName())))); + } /** @@ -236,17 +335,26 @@ protected List inContainerUndoBootstrapCmd() { } public void dropDatabaseAndUser() { - execInContainer(mssqlCmd(List.of( + execInContainer(mssqlCmdInMasterDb(List.of( String.format("USE master"), String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()), String.format("DROP DATABASE %s", getDatabaseName())))); } - public List mssqlCmd(final List sql) { - return List.of("/opt/mssql-tools/bin/sqlcmd", + public List mssqlCmdInMasterDb(final List sql) { + return Arrays.asList("/opt/mssql-tools/bin/sqlcmd", "-U", getContainer().getUsername(), "-P", getContainer().getPassword(), - "-Q", StringUtils.join("; "), + "-Q", StringUtils.join(sql, "; "), + "-d", "master", + "-b", "-e"); + } + + public List mssqlCmd(final List sql) { + return Arrays.asList("/opt/mssql-tools/bin/sqlcmd", + "-U", getUserName(), + "-P", getPassword(), + "-Q", StringUtils.join(sql, "; "), "-b", "-e"); } @@ -260,6 +368,14 @@ public SQLDialect getSqlDialect() { return SQLDialect.DEFAULT; } + private String getInternalTableName() { + return withNamespace("internal_table_"); + } + + private String getInternalSchemaName() { + return withNamespace("internal_schema_"); + } + public static enum CertificateKey { CA(true), @@ -299,7 +415,9 @@ public synchronized String getCertificate(final CertificateKey certificateKey) { } public void close() { - bgThread.stop = true; + for (var bgThread : bgThreads) { + bgThread.stop = true; + } super.close(); }