Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reinplementation for MR #251 #395

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ static bool advisory_lock(PGconn *conn, const char *relid);
static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
static void append_order_by_command(StringInfoData *command, const char *orderby, const char *ckey);

#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_UNDEFINED_FUNCTION "42883"
Expand Down Expand Up @@ -787,7 +788,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
/* acquire target tables */
appendStringInfoString(&sql,
"SELECT t.*,"
" coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
" quote_ident(coalesce(v.tablespace, t.tablespace_orig)) as tablespace_dest"
" FROM repack.tables t, "
" (VALUES ($1::text)) as v (tablespace)"
" WHERE ");
Expand Down Expand Up @@ -899,6 +900,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
for (i = 0; i < num; i++)
{
repack_table table;
StringInfoData create_sql;
StringInfoData copy_sql;
const char *ckey;
int c = 0;
Expand Down Expand Up @@ -938,32 +940,37 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.sql_pop = getstr(res, i, c++);
table.dest_tablespace = getstr(res, i, c++);

/* Craft Copy SQL */
/* Craft Create_Table and Copy SQLs */
initStringInfo(&create_sql);
printfStringInfo(&create_sql, table.create_table, table.dest_tablespace);

/*
* We need to split the copying of data into the new table into
* separate commands if we need any per-column storage settings.
* Otherwise use just "create table as"
*/
initStringInfo(&copy_sql);
appendStringInfoString(&copy_sql, table.copy_data);
if (!orderby)

/* if columns are altered */
if (table.alter_col_storage)
{
if (ckey != NULL)
{
/* CLUSTER mode */
appendStringInfoString(&copy_sql, " ORDER BY ");
appendStringInfoString(&copy_sql, ckey);
}
/* no data in CREATE TABLE */
appendStringInfoString(&create_sql, " WITH NO DATA");

/* else, VACUUM FULL mode (non-clustered tables) */
}
else if (!orderby[0])
{
/* VACUUM FULL mode (for clustered tables too), do nothing */
}
else
{
/* User specified ORDER BY */
appendStringInfoString(&copy_sql, " ORDER BY ");
appendStringInfoString(&copy_sql, orderby);
/* add ORDER BY in copy data */
appendStringInfoString(&copy_sql, table.copy_data);
append_order_by_command(&copy_sql, orderby, ckey);
table.copy_data = copy_sql.data;

} else {

/* copy all data in CREATE TABLE AS with ORDER BY
* allowing parallel SELECT executions */
append_order_by_command(&create_sql, orderby, ckey);
table.copy_data = NULL;
}
table.copy_data = copy_sql.data;

table.create_table = create_sql.data;

repack_one_table(&table, orderby);
}
Expand All @@ -977,6 +984,33 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
return ret;
}

static void
append_order_by_command(StringInfoData *command, const char *orderby, const char *ckey)
{
if (!orderby)

{
if (ckey != NULL)
{
/* CLUSTER mode */
appendStringInfoString(command, " ORDER BY ");
appendStringInfoString(command, ckey);
}

/* else, VACUUM FULL mode (non-clustered tables) */
}
else if (!orderby[0])
{
/* VACUUM FULL mode (for clustered tables too), do nothing */
}
else
{
/* User specified ORDER BY */
appendStringInfoString(command, " ORDER BY ");
appendStringInfoString(command, orderby);
}
}

static int
apply_log(PGconn *conn, const repack_table *table, int count)
{
Expand Down Expand Up @@ -1524,12 +1558,11 @@ repack_one_table(repack_table *table, const char *orderby)
* Before copying data to the target table, we need to set the column storage
* type if its storage type has been changed from the type default.
*/
params[0] = utoa(table->target_oid, buffer);
params[1] = table->dest_tablespace;
command(table->create_table, 2, params);
command(table->create_table, 0, NULL);
if (table->alter_col_storage)
command(table->alter_col_storage, 0, NULL);
command(table->copy_data, 0, NULL);
if (table->copy_data)
command(table->copy_data, 0, NULL);
temp_obj_num++;
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns)
Expand Down
14 changes: 6 additions & 8 deletions lib/pg_repack.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,15 @@ END
$$
LANGUAGE plpgsql;

CREATE FUNCTION repack.create_table(oid, name) RETURNS void AS
CREATE FUNCTION repack.create_table(oid) RETURNS text AS
$$
BEGIN
EXECUTE 'CREATE TABLE repack.table_' || $1 ||
SELECT 'CREATE TABLE repack.table_' || $1 ||
' WITH (' || repack.get_storage_param($1) || ') ' ||
' TABLESPACE ' || quote_ident($2) ||
' TABLESPACE %s' ||
' AS SELECT ' || repack.get_columns_for_create_as($1) ||
' FROM ONLY ' || repack.oid2text($1) || ' WITH NO DATA';
END
' FROM ONLY ' || repack.oid2text($1);
$$
LANGUAGE plpgsql;
LANGUAGE sql STABLE STRICT;

CREATE FUNCTION repack.create_index_type(oid, oid) RETURNS void AS
$$
Expand Down Expand Up @@ -299,7 +297,7 @@ CREATE VIEW repack.tables AS
'SELECT repack.create_log_table(' || R.oid || ')' AS create_log,
repack.get_create_trigger(R.oid, PK.indexrelid) AS create_trigger,
repack.get_enable_trigger(R.oid) as enable_trigger,
'SELECT repack.create_table($1, $2)'::text AS create_table,
repack.create_table(R.oid)::text AS create_table,
coalesce(S.spcname, S2.spcname) AS tablespace_orig,
'INSERT INTO repack.table_' || R.oid || ' SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS copy_data,
repack.get_alter_col_storage(R.oid) AS alter_col_storage,
Expand Down
Loading