Skip to content

Commit

Permalink
Reinplementation for MR reorg#251
Browse files Browse the repository at this point in the history
Use CREATE TABLE as SELECT allowing parallel plans if it is possible.
  • Loading branch information
Evgeny Brednya committed Apr 22, 2024
1 parent aae9f2f commit 96b0da4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 34 deletions.
85 changes: 59 additions & 26 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ static bool advisory_lock(PGconn *conn, const char *relid);
static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
static void append_order_by_command(StringInfoData *command, const char *orderby, const char *ckey);

#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_UNDEFINED_FUNCTION "42883"
Expand Down Expand Up @@ -787,7 +788,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
/* acquire target tables */
appendStringInfoString(&sql,
"SELECT t.*,"
" coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
" quote_ident(coalesce(v.tablespace, t.tablespace_orig)) as tablespace_dest"
" FROM repack.tables t, "
" (VALUES ($1::text)) as v (tablespace)"
" WHERE ");
Expand Down Expand Up @@ -899,6 +900,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
for (i = 0; i < num; i++)
{
repack_table table;
StringInfoData create_sql;
StringInfoData copy_sql;
const char *ckey;
int c = 0;
Expand Down Expand Up @@ -938,32 +940,37 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.sql_pop = getstr(res, i, c++);
table.dest_tablespace = getstr(res, i, c++);

/* Craft Copy SQL */
/* Craft Create_Table and Copy SQLs */
initStringInfo(&create_sql);
printfStringInfo(&create_sql, table.create_table, table.dest_tablespace);

/*
* We need to split the copying of data into the new table into
* separate commands if we need any per-column storage settings.
* Otherwise use just "create table as"
*/
initStringInfo(&copy_sql);
appendStringInfoString(&copy_sql, table.copy_data);
if (!orderby)

/* if columns are altered */
if (table.alter_col_storage)
{
if (ckey != NULL)
{
/* CLUSTER mode */
appendStringInfoString(&copy_sql, " ORDER BY ");
appendStringInfoString(&copy_sql, ckey);
}
/* no data in CREATE TABLE */
appendStringInfoString(&create_sql, " WITH NO DATA");

/* else, VACUUM FULL mode (non-clustered tables) */
}
else if (!orderby[0])
{
/* VACUUM FULL mode (for clustered tables too), do nothing */
}
else
{
/* User specified ORDER BY */
appendStringInfoString(&copy_sql, " ORDER BY ");
appendStringInfoString(&copy_sql, orderby);
/* add ORDER BY in copy data */
appendStringInfoString(&copy_sql, table.copy_data);
append_order_by_command(&copy_sql, orderby, ckey);
table.copy_data = copy_sql.data;

} else {

/* copy all data in CREATE TABLE AS with ORDER BY
* allowing parallel SELECT executions */
append_order_by_command(&create_sql, orderby, ckey);
table.copy_data = NULL;
}
table.copy_data = copy_sql.data;

table.create_table = create_sql.data;

repack_one_table(&table, orderby);
}
Expand All @@ -977,6 +984,33 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
return ret;
}

static void
append_order_by_command(StringInfoData *command, const char *orderby, const char *ckey)
{
if (!orderby)

{
if (ckey != NULL)
{
/* CLUSTER mode */
appendStringInfoString(command, " ORDER BY ");
appendStringInfoString(command, ckey);
}

/* else, VACUUM FULL mode (non-clustered tables) */
}
else if (!orderby[0])
{
/* VACUUM FULL mode (for clustered tables too), do nothing */
}
else
{
/* User specified ORDER BY */
appendStringInfoString(command, " ORDER BY ");
appendStringInfoString(command, orderby);
}
}

static int
apply_log(PGconn *conn, const repack_table *table, int count)
{
Expand Down Expand Up @@ -1524,12 +1558,11 @@ repack_one_table(repack_table *table, const char *orderby)
* Before copying data to the target table, we need to set the column storage
* type if its storage type has been changed from the type default.
*/
params[0] = utoa(table->target_oid, buffer);
params[1] = table->dest_tablespace;
command(table->create_table, 2, params);
command(table->create_table, 0, NULL);
if (table->alter_col_storage)
command(table->alter_col_storage, 0, NULL);
command(table->copy_data, 0, NULL);
if (table->copy_data)
command(table->copy_data, 0, NULL);
temp_obj_num++;
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns)
Expand Down
14 changes: 6 additions & 8 deletions lib/pg_repack.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,15 @@ END
$$
LANGUAGE plpgsql;

CREATE FUNCTION repack.create_table(oid, name) RETURNS void AS
CREATE FUNCTION repack.create_table(oid) RETURNS text AS
$$
BEGIN
EXECUTE 'CREATE TABLE repack.table_' || $1 ||
SELECT 'CREATE TABLE repack.table_' || $1 ||
' WITH (' || repack.get_storage_param($1) || ') ' ||
' TABLESPACE ' || quote_ident($2) ||
' TABLESPACE %s' ||
' AS SELECT ' || repack.get_columns_for_create_as($1) ||
' FROM ONLY ' || repack.oid2text($1) || ' WITH NO DATA';
END
' FROM ONLY ' || repack.oid2text($1);
$$
LANGUAGE plpgsql;
LANGUAGE sql STABLE STRICT;

CREATE FUNCTION repack.create_index_type(oid, oid) RETURNS void AS
$$
Expand Down Expand Up @@ -299,7 +297,7 @@ CREATE VIEW repack.tables AS
'SELECT repack.create_log_table(' || R.oid || ')' AS create_log,
repack.get_create_trigger(R.oid, PK.indexrelid) AS create_trigger,
repack.get_enable_trigger(R.oid) as enable_trigger,
'SELECT repack.create_table($1, $2)'::text AS create_table,
repack.create_table(R.oid)::text AS create_table,
coalesce(S.spcname, S2.spcname) AS tablespace_orig,
'INSERT INTO repack.table_' || R.oid || ' SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS copy_data,
repack.get_alter_col_storage(R.oid) AS alter_col_storage,
Expand Down

0 comments on commit 96b0da4

Please sign in to comment.