-
Notifications
You must be signed in to change notification settings - Fork 10
/
cigration--1.1.sql
4300 lines (3762 loc) · 210 KB
/
cigration--1.1.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- cigration is the compound word of citus and migration
CREATE SCHEMA cigration;
DROP TYPE IF EXISTS cigration.old_shard_placement_drop_method CASCADE;
CREATE TYPE cigration.old_shard_placement_drop_method AS ENUM (
'none', -- do not drop or rename old shards, only record it into cigration.citus_move_shard_placement_remained_old_shard
'rename', -- move old shards to schema "citus_move_shard_placement_recyclebin"
'drop' -- drop old shards in source node
);
DROP TYPE IF EXISTS cigration.cigration_shard_transfer_mode CASCADE;
CREATE TYPE cigration.cigration_shard_transfer_mode AS ENUM (
'force_logical', -- Use logical replication even if the table doesn't have a replica identity. Any concurrent update/delete statements to the table will fail during replication.
'block_writes' -- Use COPY (blocking writes) for tables lacking primary key or replica identity.
-- ,auto: Require replica identity if logical replication is possible, otherwise use legacy behaviour (e.g. for shard repair, PostgreSQL 9.6). This is the default value.
);
DROP TABLE IF EXISTS cigration.citus_move_shard_placement_remained_old_shard CASCADE;
CREATE TABLE cigration.citus_move_shard_placement_remained_old_shard(
id serial primary key,
optime timestamptz NOT NULL default now(),
nodename text NOT NULL,
nodeport text NOT NULL,
tablename text NOT NULL,
drop_method cigration.old_shard_placement_drop_method NOT NULL
);
--update pg_dist_placement on workers which have metadata when executing function cigration.citus_move_shard_placement
--insert the 2pc transaction information into citus_move_shard_placement_transaction when update is failed
--and then manually call function pg_catalog.citus_move_shard_placement_transaction_cleanup() to complete the update operation.
DROP TABLE IF EXISTS cigration.citus_move_shard_placement_transaction CASCADE;
CREATE TABLE cigration.citus_move_shard_placement_transaction(
id serial primary key,
optime timestamptz NOT NULL default now(),
nodename text NOT NULL,
nodeport integer NOT NULL,
gid text NOT NULL
);
--use method citus_move_shard_placement to move shard to worker
--when failed then insert the failed record into this table
--and then manually call function cigration.cigration_create_distributed_table_move_shard_cleanup() to complete move operation.
DROP TABLE IF EXISTS cigration.create_distributed_table_move_shard_to_worker CASCADE;
CREATE TABLE cigration.create_distributed_table_move_shard_to_worker(
id serial primary key,
optime timestamptz NOT NULL default now(),
shard_id bigint NOT NULL,
sourcenodename text NOT NULL,
sourcenodeport integer NOT NULL,
targetnodename text NOT NULL,
targetnodeport integer NOT NULL
);
--
-- Custom Types
-- ENUM TYPE
-- init:迁移task初始化完成
-- running:迁移task正在运行
-- completed:迁移task已完成
-- error:迁移task出现错误
-- canceled:迁移task被取消
--
DROP TYPE IF EXISTS cigration.migration_status CASCADE;
CREATE TYPE cigration.migration_status AS ENUM('init','running','completed','error','canceled');
--
-- 创建事件触发器,在迁移期间阻止删除迁移任务中存在的表
--
CREATE OR REPLACE FUNCTION cigration.prevent_drop_table_in_migration_task()
RETURNS event_trigger LANGUAGE plpgsql AS $$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() WHERE object_type = 'table'
LOOP
IF (select count(*) <> 0 from cigration.pg_citus_shard_migration where obj.object_identity = any (all_colocated_logicalrels)) THEN
RAISE EXCEPTION 'Can not drop table % which is in shard migration task',obj.object_identity;
END IF;
END LOOP;
END
$$;
CREATE EVENT TRIGGER prevent_drop_table_during_migration
ON sql_drop
EXECUTE PROCEDURE cigration.prevent_drop_table_in_migration_task();
--
-- 创建事件触发器,在迁移期间阻止对正在迁移的表执行"ALTER TABLE,CREATE INDEX,ALTER INDEX,DROP INDEX"
--
CREATE OR REPLACE FUNCTION cigration.prevent_alter_table_in_running_migration_task()
RETURNS event_trigger
AS $$
BEGIN
IF (select count(*) <> 0 from cigration.pg_citus_shard_migration where status = 'running') THEN
RAISE EXCEPTION '"ALTER TABLE,CREATE INDEX,ALTER INDEX,DROP INDEX" SQL is forbidden during shard migration.';
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER prevent_alter_table_during_migration
ON ddl_command_start WHEN TAG IN ('ALTER TABLE','CREATE INDEX','ALTER INDEX','DROP INDEX')
EXECUTE PROCEDURE cigration.prevent_alter_table_in_running_migration_task();
--
-- 自定义序列,用于生成jobid
--
DROP SEQUENCE IF EXISTS cigration.jobid_seq;
CREATE SEQUENCE IF NOT EXISTS cigration.jobid_seq AS INTEGER INCREMENT BY 1 START WITH 1;
--
-- 定义分片迁移的任务表
--
DROP TABLE IF EXISTS cigration.pg_citus_shard_migration CASCADE;
CREATE TABLE IF NOT EXISTS cigration.pg_citus_shard_migration(
jobid integer not null,
taskid serial not null,
source_nodename text not null,
source_nodeport integer not null,
target_nodename text not null,
target_nodeport integer not null,
status cigration.migration_status not null default 'init'::cigration.migration_status,
colocationid integer not null,
all_colocated_shards_id bigint[] not null,
all_colocated_shards_size bigint[] not null,
all_colocated_logicalrels text[] not null,
total_shard_count integer not null,
total_shard_size bigint not null,
create_time timestamp not null,
start_time timestamp,
end_time timestamp,
error_message text
);
COMMENT ON COLUMN cigration.pg_citus_shard_migration.status IS '该task的状态,包含init,running,completed,error,canceled';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.colocationid IS '该组亲和关系的亲和ID';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.all_colocated_shards_id IS '该组亲和关系中,每个分片的shardid';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.all_colocated_shards_size IS '该组亲和关系中,每个分片的大小,顺序与id列一一对应';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.all_colocated_logicalrels IS '该组亲和关系中,每个分片的逻辑表的识别符(shcema名.table名)';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.total_shard_count IS '该组亲和关系中,所有的分片数总和';
COMMENT ON COLUMN cigration.pg_citus_shard_migration.total_shard_size IS '该组亲和关系中,所有的分片的总大小';
create unique index ON cigration.pg_citus_shard_migration (jobid,taskid);
--
-- 定义分片迁移的历史任务表
--
DROP TABLE IF EXISTS cigration.pg_citus_shard_migration_history CASCADE;
CREATE TABLE IF NOT EXISTS cigration.pg_citus_shard_migration_history(
jobid integer not null,
taskid integer not null,
source_nodename text not null,
source_nodeport integer not null,
target_nodename text not null,
target_nodeport integer not null,
status cigration.migration_status not null,
colocationid integer not null,
all_colocated_shards_id bigint[] not null,
all_colocated_shards_size bigint[] not null,
all_colocated_logicalrels text[] not null,
total_shard_count integer not null,
total_shard_size bigint not null,
create_time timestamp not null,
start_time timestamp,
end_time timestamp,
error_message text
);
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.status IS '该task的状态,包含init,running,completed,error,canceled';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.colocationid IS '该组亲和关系的亲和ID';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.all_colocated_shards_id IS '该组亲和关系中,每个分片的shardid';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.all_colocated_shards_size IS '该组亲和关系中,每个分片的大小,顺序与id列一一对应';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.all_colocated_logicalrels IS '该组亲和关系中,每个分片的逻辑表的识别符(shcema名.table名)';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.total_shard_count IS '该组亲和关系中,所有的分片数总和';
COMMENT ON COLUMN cigration.pg_citus_shard_migration_history.total_shard_size IS '该组亲和关系中,所有的分片的总大小';
create unique index ON cigration.pg_citus_shard_migration_history(jobid,taskid);
-- Record all successfully executed SQL statements.
create table if not exists cigration.pg_citus_shard_migration_sql_log
(
id bigserial primary key,
jobid int not null,
taskid int not null,
execute_nodename text not null,
execute_nodeport integer not null,
functionid text not null,
sql text,
execute_time timestamp default now()
);
-- forbidden the citus function create_distributed_table which was called by parameter table_name and distribution_column
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type default 'hash'::citus.distribution_type
)
RETURNS void
AS $create_distributed_table$
BEGIN
END;
$create_distributed_table$ LANGUAGE plpgsql SET search_path = 'pg_catalog','public';
-- 在指定worker节点集合上创建hash分片表(colocate_with固定为'none',不需要并发互斥)
CREATE OR REPLACE FUNCTION cigration.cigration_create_distributed_table_with_placement(table_name regclass,
distribution_column text,
nodenames text[],
nodeports integer[]
)
RETURNS void
AS $cigration_create_distributed_table_with_placement$
DECLARE
source_node_count integer;
source_nodenames text[];
source_nodeports integer[];
target_node_count integer;
target_nodenames text[];
target_nodeports integer[];
shardids bigint[];
shardids_count integer;
itmp integer;
jtmp integer;
icurrentnode_port integer;
error_msg text;
iarraytmp integer :=1;
BEGIN
RAISE DEBUG 'BEGIN create distributed table:%(%)', table_name, distribution_column;
--check current node is cn?
IF (SELECT CASE WHEN (select count(*) from pg_dist_node)>0 THEN (select groupid from pg_dist_local_group) ELSE -1 END) <> 0 THEN
RAISE 'Function cigration.cigration_create_distributed_table_with_placement could only be executed on coordinate node';
END IF;
--check citus.shard_replication_factor
IF (select setting from pg_settings where name='citus.shard_replication_factor') <> '1' THEN
RAISE 'citus.shard_replication_factor must be 1';
END IF;
--get source nodes/ports and target nodes/ports
--check nodenames or nodeports is NULL?
IF nodenames IS NULL OR nodeports IS NULL THEN
RAISE $$nodenames OR nodeports can not be null$$;
END IF;
--check nodenames and nodeports length is corract?
IF array_length(nodenames, 1) < 1 OR array_length(nodenames, 1) <> array_length(nodeports, 1) THEN
RAISE 'The length of nodenames or nodeports is invalid';
END IF;
--check nodenames and nodeports is in pg_dist_node?
FOR itmp IN 1..array_length(nodenames, 1) LOOP
SELECT count(*)
INTO jtmp
FROM pg_dist_node
WHERE nodename=nodenames[itmp] AND
nodeport=nodeports[itmp] AND
shouldhaveshards='t' AND
isactive='t' AND
noderole = 'primary';
IF jtmp <> 1 THEN
RAISE 'Specified worker %:% is invalid.',
nodenames[itmp], nodeports[itmp];
END IF;
END LOOP;
--Returned the workers which have not been specified
SELECT count(*), array_agg(nodename), array_agg(nodeport)
INTO STRICT source_node_count, source_nodenames, source_nodeports
FROM pg_dist_node
WHERE shouldhaveshards='t' AND
isactive='t' AND
noderole='primary' AND
(nodename, nodeport) NOT IN (SELECT * FROM unnest(nodenames, nodeports));
target_nodenames := nodenames;
target_nodeports := nodeports;
target_node_count := array_length(nodenames, 1);
--check target node/port is not null?
IF target_node_count = 0 THEN
RAISE $$there are no workers that the distributed table can be created.$$;
END IF;
RAISE DEBUG 'create distributed table:table_name(%) distribution_column(%)',
table_name, distribution_column;
--get the current node port
SELECT setting INTO icurrentnode_port FROM pg_settings where name='port';
--create dblink connection
PERFORM dblink_disconnect(con)
FROM (select unnest(a) con from dblink_get_connections() a)b
WHERE con = 'citus_create_distributed_table_con';
PERFORM dblink_connect('citus_create_distributed_table_con',
format('host=%s port=%s user=%s dbname=%s',
'127.0.0.1',
icurrentnode_port,
current_user,
current_database()));
BEGIN
--set shard_count and shard_replication_factor in dblink connection
PERFORM * FROM dblink('citus_create_distributed_table_con',
format($$SET search_path = %s;SET citus.shard_count = %s;SET citus.shard_replication_factor = %s;$$,
(SELECT setting FROM pg_settings where name='search_path'),
(SELECT setting FROM pg_settings where name='citus.shard_count'),
(SELECT setting FROM pg_settings where name='citus.shard_replication_factor'))
) AS t(result_record text);
--create distribute table on all workers
PERFORM * FROM dblink('citus_create_distributed_table_con',
format($$SELECT pg_catalog.create_distributed_table('%s', '%s', '%s', '%s')$$,
table_name, distribution_column, 'hash','none')
) AS t(result_record text);
--all workers without metadata
IF source_node_count = 0 THEN
RETURN;
END IF;
--move shardid from src nodename to des nodename
FOR itmp IN 1..source_node_count LOOP
--fetch shardid list with table name/nodename/nodeport
RAISE DEBUG 'fetch shardid list with %:%:%', source_nodenames[itmp], source_nodeports[itmp],
table_name;
SELECT count(*), array_agg(shardid)
INTO STRICT shardids_count, shardids
FROM pg_dist_placement
WHERE groupid IN
(SELECT groupid FROM pg_dist_node WHERE nodename= source_nodenames[itmp] AND
nodeport= source_nodeports[itmp])
AND shardid IN
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid=table_name);
--no shard in src nodename
IF shardids_count = 0 THEN
CONTINUE;
END IF;
RAISE DEBUG 'move shard placement %:%', source_nodenames[itmp], source_nodeports[itmp];
FOR jtmp IN 1..shardids_count LOOP
BEGIN
iarraytmp := iarraytmp%target_node_count + 1;
EXECUTE format($moveshard$SELECT * FROM dblink('citus_create_distributed_table_con',
$$SELECT cigration.citus_move_shard_placement(%s, '%s', %s, '%s', %s, 'drop', 'block_writes')$$
)
AS t(result_record text) $moveshard$,
shardids[jtmp], source_nodenames[itmp], source_nodeports[itmp],
target_nodenames[iarraytmp], target_nodeports[iarraytmp]);
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
--insert information when executing move shard failed
INSERT INTO cigration.create_distributed_table_move_shard_to_worker
(shard_id, sourcenodename, sourcenodeport, targetnodename, targetnodeport)
VALUES (shardids[jtmp], source_nodenames[itmp], source_nodeports[itmp],
target_nodenames[iarraytmp], target_nodeports[iarraytmp]);
--output warning msg when executing synchronize metadata failed
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to move shard[%] from [%:%] to [%:%] in source:%.Please execute function cigration.cigration_create_distributed_table_move_shard_cleanup() manually after shooting the trouble.',
shardids[jtmp], source_nodenames[itmp], source_nodeports[itmp],
target_nodenames[iarraytmp], target_nodeports[iarraytmp],
error_msg;
END;
END LOOP;
END LOOP;
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
BEGIN
PERFORM dblink_disconnect('citus_create_distributed_table_con');
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to call dblink_disconnect:%', error_msg;
END;
RAISE;
END;
--cleanup
BEGIN
PERFORM dblink_disconnect('citus_create_distributed_table_con');
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to call dblink_disconnect:%', error_msg;
END;
END;
$cigration_create_distributed_table_with_placement$ LANGUAGE plpgsql SET search_path = 'cigration','public';
-- 创建分片表函数,接口和create_distributed_table()完全兼容
-- 为确保colocate_with指向的表名被正确解析,cigration_create_distributed_table()函数内部的search_path需要和会话保持一致
CREATE OR REPLACE FUNCTION cigration.cigration_create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default'
)
RETURNS void
AS $cigration_create_distributed_table$
DECLARE
icurrentnode_port integer;
table_identity text;
error_msg text;
BEGIN
RAISE DEBUG 'BEGIN create distributed table:%(%)', table_name, distribution_column;
-- check current node is cn?
IF (SELECT CASE WHEN (select count(*) from pg_dist_node)>0 THEN (select groupid from pg_dist_local_group) ELSE -1 END) <> 0 THEN
RAISE 'Function cigration.cigration_create_distributed_table could only be executed on coordinate node';
END IF;
-- add check for shard migration
IF distribution_type = 'hash' AND colocate_with <> 'none' THEN
IF NOT (select pg_try_advisory_xact_lock('cigration.pg_citus_shard_migration'::regclass::int)) THEN
RAISE EXCEPTION 'Can not call cigration api concurrently.';
END IF;
IF (select count(*) <> 0 from cigration.pg_citus_shard_migration) THEN
RAISE 'create colocated hash distributed table is forbidden during the shard migration.'
USING HINT = 'you could check cigration.pg_citus_shard_migration for existing shard migration tasks.';
END IF;
END IF;
-- get the current node port
SELECT setting INTO icurrentnode_port FROM pg_settings where name='port';
--在事务块(包括函数,多SQL语句)中调用create_distributed_table()时,如果表名长度大于55字节,会触发分布式死锁。
--因此通过dblink调用create_distributed_table(),回避这个问题。
--create dblink connection
PERFORM dblink_disconnect(con)
FROM (select unnest(a) con from dblink_get_connections() a)b
WHERE con = 'citus_create_distributed_table_con';
PERFORM dblink_connect('citus_create_distributed_table_con',
format('host=%s port=%s user=%s dbname=%s',
'127.0.0.1',
icurrentnode_port,
current_user,
current_database()));
BEGIN
-- set shard_count and shard_replication_factor in dblink connection
PERFORM * FROM dblink('citus_create_distributed_table_con',
format($$SET search_path = %s;SET citus.shard_count = %s;SET citus.shard_replication_factor = %s;$$,
(SELECT setting FROM pg_settings where name='search_path'),
(SELECT setting FROM pg_settings where name='citus.shard_count'),
(SELECT setting FROM pg_settings where name='citus.shard_replication_factor'))
) AS t(result_record text);
-- create distribute table
PERFORM * FROM dblink('citus_create_distributed_table_con',
format($$SELECT pg_catalog.create_distributed_table('%s', '%s', '%s', '%s')$$,
table_name, distribution_column, distribution_type, colocate_with)
) AS t(result_record text);
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
BEGIN
PERFORM dblink_disconnect('citus_create_distributed_table_con');
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to call dblink_disconnect:%', error_msg;
END;
RAISE;
END;
--cleanup
BEGIN
PERFORM dblink_disconnect('citus_create_distributed_table_con');
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to call dblink_disconnect:%', error_msg;
END;
END;
$cigration_create_distributed_table$ LANGUAGE plpgsql;
-- move shard to worker
CREATE OR REPLACE FUNCTION cigration.cigration_create_distributed_table_move_shard_cleanup()
RETURNS void
AS $cigration_create_distributed_table_move_shard_cleanup$
DECLARE
curs CURSOR FOR SELECT shard_id, sourcenodename, sourcenodeport, targetnodename, targetnodeport
FROM cigration.create_distributed_table_move_shard_to_worker;
ishard_id bigint;
isourcenode text;
isourceport integer;
itargetnode text;
itargetport integer;
error_msg text;
BEGIN
OPEN curs;
LOOP
FETCH curs INTO ishard_id, isourcenode, isourceport, itargetnode, itargetport;
IF FOUND THEN
BEGIN
EXECUTE format($$select citus_move_shard_placement(%s, '%s', %s, '%s', %s, 'drop', 'block_writes')$$,
ishard_id, isourcenode, isourceport, itargetnode, itargetport);
DELETE FROM cigration.create_distributed_table_move_shard_to_worker WHERE CURRENT OF curs;
EXCEPTION WHEN QUERY_CANCELED or OTHERS THEN
GET STACKED DIAGNOSTICS error_msg = MESSAGE_TEXT;
RAISE WARNING 'failed to move shard[%] from[%:%] to [%:%] in source:%.', ishard_id, isourcenode, isourceport, itargetnode, itargetport, error_msg;
END;
ELSE
EXIT;
END IF;
END LOOP;
CLOSE curs;
RETURN;
END;
$cigration_create_distributed_table_move_shard_cleanup$ LANGUAGE plpgsql SET search_path = 'cigration','public';
-- move this shard and it's all colocated shards from source node to target node.
-- drop_method define how to process old shards in the source node, default is 'none' which does not block SELECT.
-- old shards should be drop in the future will be recorded into table cigration.citus_move_shard_placement_remained_old_shard
CREATE OR REPLACE FUNCTION cigration.citus_move_shard_placement(shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
drop_method cigration.old_shard_placement_drop_method DEFAULT 'none',
shard_transfer_mode cigration.cigration_shard_transfer_mode default 'force_logical',
consistent_check bool default true,
user_lock_timeout integer DEFAULT 70000,
data_sync_timeout integer DEFAULT 3600)
RETURNS void
AS $citus_move_shard_placement$
DECLARE
source_node_id integer;
source_group_id integer;
target_node_id integer;
target_group_id integer;
logical_relid regclass;
part_method text;
source_active_shard_id_array bigint[];
source_bad_shards_string text;
target_exist_shard_id_array bigint[];
target_shard_tables_with_data text;
logical_relid_array regclass[];
inh_logical_relid_array regclass[];
logical_schema_array text[];
logical_table_array text[];
colocated_table_count integer;
inh_colocated_table_count integer;
noinh_shard_fulltablename_count integer;
i integer;
logical_schema text;
shard_id_array bigint[];
shard_fulltablename_array text[];
inh_shard_fulltablename_array text[];
noinh_shard_fulltablename_array text[];
tmp_shard_id bigint;
sub_rel_count_srsubid bigint;
sub_lag numeric;
source_wal_lsn pg_lsn;
error_msg text;
source_tables_data_count bigint;
target_tables_data_count bigint;
metadata_node_count integer;
metadata_nodename_array text[];
metadata_nodeport_array integer[];
j integer;
icurrentgroupid integer;
i_source_node_port integer;
i_target_node_port integer;
t_start_sync_time timestamptz;
pub_created boolean := false;
sub_created boolean := false;
table_created boolean := false;
dblink_created boolean := false;
need_record_source_shard boolean := false;
BEGIN
--check current node is cn?
SELECT CASE WHEN (select count(*) from pg_dist_node)>0 THEN (select groupid from pg_dist_local_group) ELSE -1 END
INTO icurrentgroupid;
IF icurrentgroupid <> 0 THEN
RAISE 'Function citus_move_shard_placement could only be executed on coordinate node';
END IF;
-- check and get node id of target node and target node. Will fail for invalid input.
IF source_node_name = target_node_name AND source_node_port = target_node_port THEN
RAISE 'target node can not be same as source node';
END IF;
SELECT nodeid, groupid
INTO source_node_id,source_group_id
FROM pg_dist_node
WHERE nodename = source_node_name AND
nodeport = source_node_port AND
isactive = 't' AND
noderole = 'primary';
IF source_node_id is NULL OR source_group_id is NULL THEN
RAISE 'invalid source node %:%', source_node_name, source_node_port;
END IF;
SELECT nodeid, groupid
INTO target_node_id,target_group_id
FROM pg_dist_node
WHERE nodename = target_node_name AND
nodeport = target_node_port AND
isactive = 't' AND
noderole = 'primary';
IF target_node_id is NULL OR target_group_id is NULL THEN
RAISE 'invalid target node %:%', target_node_name, target_node_port;
END IF;
-- check if the shard is hash shard
SELECT logicalrelid
INTO logical_relid
FROM pg_dist_shard
WHERE shardid = shard_id;
IF logical_relid is NULL THEN
RAISE 'shard % does not exist', shard_id;
END IF;
SELECT partmethod
INTO part_method
FROM pg_dist_partition
WHERE logicalrelid = logical_relid;
IF part_method is NULL OR part_method <> 'h' THEN
RAISE '% is not a hash shard', shard_id;
END IF;
-- get all colocated tables and there shard id
SELECT count(logicalrelid), array_agg(logicalrelid)
INTO STRICT colocated_table_count,logical_relid_array
FROM pg_dist_partition
WHERE colocationid=(select colocationid from pg_dist_partition where logicalrelid=logical_relid);
SELECT array_agg(nspname), array_agg(relname)
INTO STRICT logical_schema_array, logical_table_array
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = any(logical_relid_array);
SELECT array_agg(shardid), array_agg(shard_name(logicalrelid, shardid))
INTO STRICT shard_id_array, shard_fulltablename_array
FROM (SELECT shardid, logicalrelid
FROM pg_dist_shard
WHERE logicalrelid = any(logical_relid_array) AND
(shardminvalue,shardmaxvalue)=(select shardminvalue,shardmaxvalue from pg_dist_shard where shardid=shard_id)
ORDER BY shardid ASC) AS tbl;
-- check inherit information
IF shard_transfer_mode = 'force_logical' THEN
-- inherit table list
SELECT count(logicalrelid), array_agg(logicalrelid)
INTO STRICT inh_colocated_table_count,inh_logical_relid_array
FROM pg_dist_partition p , pg_class c
WHERE colocationid=(select colocationid from pg_dist_partition where logicalrelid=logical_relid) AND
p.logicalrelid=c.oid AND c.relhassubclass='t';
-- all of the inherit table are not partition table?
SELECT count(logicalrelid)
INTO STRICT inh_colocated_table_count
FROM pg_dist_partition p , pg_class c
WHERE p.logicalrelid = any(inh_logical_relid_array) AND
p.logicalrelid=c.oid AND c.relkind<>'p';
IF inh_colocated_table_count > 0 THEN
RAISE 'Inherit table are not supported in function citus_move_shard_placement';
END IF;
-- get partition table's full name
SELECT array_agg(shard_name(logicalrelid, shardid))
INTO STRICT inh_shard_fulltablename_array
FROM (SELECT shardid, logicalrelid
FROM pg_dist_shard
WHERE logicalrelid = any(inh_logical_relid_array) AND
(shardminvalue,shardmaxvalue)=(select shardminvalue,shardmaxvalue from pg_dist_shard where shardid=shard_id)
ORDER BY shardid ASC) AS tbl;
-- get colocated tables without partition tables
SELECT count(table_name), array_agg(table_name)
INTO STRICT noinh_shard_fulltablename_count, noinh_shard_fulltablename_array
FROM
((select table_name from unnest(shard_fulltablename_array) table_name)
EXCEPT
(select table_name from unnest(inh_shard_fulltablename_array) table_name)) AS tb1;
END IF;
-- check if all colocated shards are valid
SELECT array_agg(shardid)
INTO source_active_shard_id_array
FROM (SELECT shardid
FROM pg_dist_placement
WHERE shardid = any(shard_id_array) AND
groupid = source_group_id AND
shardstate = 1
ORDER BY shardid ASC) AS tb1;
IF source_active_shard_id_array is NULL THEN
RAISE 'shard % in source node do not exist or invalid', shard_id_array;
ELSIF source_active_shard_id_array <> shard_id_array THEN
SELECT string_agg(shardid::text,',')
INTO STRICT source_bad_shards_string
FROM unnest(shard_id_array) t(shardid)
WHERE shardid <> any(source_active_shard_id_array);
RAISE 'shard % in source node do not exist or invalid', source_bad_shards_string;
END IF;
SELECT array_agg(shardid)
INTO target_exist_shard_id_array
FROM pg_dist_placement
WHERE shardid = shard_id AND
groupid = target_group_id;
IF target_exist_shard_id_array is not NULL THEN
RAISE 'shard % already exist in target node', target_exist_shard_id_array;
END IF;
RAISE NOTICE 'BEGIN move shards(%) from %:% to %:%',
array_to_string(shard_id_array,','),
source_node_name, source_node_port,
target_node_name, target_node_port;
--fetch worker which has metadata
SELECT count(*), array_agg(nodename), array_agg(nodeport)
INTO STRICT metadata_node_count, metadata_nodename_array, metadata_nodeport_array
FROM pg_dist_node
WHERE hasmetadata='t' AND isactive='t' AND noderole='primary';
-- get source and target pg port
SELECT port
INTO i_source_node_port
FROM dblink(format('host=%s port=%s user=%s dbname=%s', source_node_name, source_node_port, current_user, current_database()),
$$SELECT setting FROM pg_settings WHERE name = 'port'$$) AS tb(port integer);
SELECT port
INTO i_target_node_port
FROM dblink(format('host=%s port=%s user=%s dbname=%s', target_node_name, target_node_port, current_user, current_database()),
$$SELECT setting FROM pg_settings WHERE name = 'port'$$) AS tb(port integer);
-- create dblink connection on cn
PERFORM dblink_disconnect(con)
FROM (select unnest(a) con from dblink_get_connections() a)b
WHERE con like 'citus_move_shard_placement_%';
PERFORM dblink_connect('citus_move_shard_placement_source_con',
format('host=%s port=%s user=%s dbname=%s',
source_node_name,
i_source_node_port,
current_user,
current_database()));
dblink_created := true;
PERFORM dblink_connect('citus_move_shard_placement_target_con',
format('host=%s port=%s user=%s dbname=%s',
target_node_name,
i_target_node_port,
current_user,
current_database()));
-- set lock timeout on cn
EXECUTE format('SET lock_timeout=%s',user_lock_timeout);
-- lock tables from executing DDL
FOR i IN 1..colocated_table_count LOOP
RAISE NOTICE '[%/%] LOCK TABLE %.% IN SHARE UPDATE EXCLUSIVE MODE ...',
i, colocated_table_count, logical_schema_array[i], logical_table_array[i];
EXECUTE format('LOCK TABLE %I.%I IN SHARE UPDATE EXCLUSIVE MODE',
logical_schema_array[i],
logical_table_array[i]);
END LOOP;
-- create dblink connection on workers which have metadata, and then lock tables from executing DDL
IF metadata_node_count >=1 THEN
FOR j IN 1..metadata_node_count LOOP
PERFORM dblink_connect(format('citus_move_shard_placement_%s_%s_con',
metadata_nodename_array[j],
metadata_nodeport_array[j]),
format('host=%s port=%s user=%s dbname=%s',
metadata_nodename_array[j],
metadata_nodeport_array[j],
current_user,
current_database()));
--begin transaction on each worker which has metadata
PERFORM dblink_exec(format('citus_move_shard_placement_%s_%s_con',
metadata_nodename_array[j],
metadata_nodeport_array[j]),
'BEGIN');
--set lock timeout on workers which have metadata
PERFORM dblink_exec(format('citus_move_shard_placement_%s_%s_con',
metadata_nodename_array[j],
metadata_nodeport_array[j]),
format('set lock_timeout=%s',user_lock_timeout));
--lock tables from executing DDL on citus_move_shard_placement_node
-- FOR i IN 1..colocated_table_count LOOP
-- RAISE NOTICE '[citus_move_shard_placement_%_%_con][%/%] LOCK TABLE %.% IN SHARE UPDATE EXCLUSIVE MODE ...',
-- metadata_nodename_array[j],
-- metadata_nodeport_array[j],
-- i, colocated_table_count,
-- logical_schema_array[i], logical_table_array[i];
-- PERFORM dblink_exec(format('citus_move_shard_placement_%s_%s_con',
-- metadata_nodename_array[j],
-- metadata_nodeport_array[j]),
-- format('LOCK TABLE %I.%I IN SHARE UPDATE EXCLUSIVE MODE',
-- logical_schema_array[i],
-- logical_table_array[i]));
--END LOOP;
END LOOP;
END IF;
BEGIN
IF shard_transfer_mode = 'force_logical' THEN
-- CREATE PUBLICATION in source node
RAISE NOTICE 'CREATE PUBLICATION in source node %:%', source_node_name, source_node_port;
PERFORM dblink_exec('citus_move_shard_placement_source_con',
'DROP PUBLICATION IF EXISTS citus_move_shard_placement_pub CASCADE');
PERFORM dblink_exec('citus_move_shard_placement_source_con',
format('CREATE PUBLICATION citus_move_shard_placement_pub
FOR TABLE %s',
(select string_agg(table_name,',') from unnest(noinh_shard_fulltablename_array) table_name)));
pub_created := true;
END IF;
-- CREATE SCHEMA IF NOT EXISTS in target node
FOR logical_schema IN select distinct unnest(logical_schema_array) LOOP
PERFORM dblink_exec('citus_move_shard_placement_target_con',
format('CREATE SCHEMA IF NOT EXISTS %I',
logical_schema));
END LOOP;
-- create shard table in the target node
RAISE NOTICE 'create shard table in the target node %:%', target_node_name, target_node_port;
EXECUTE format($sql$COPY (select '') to PROGRAM $$pg_dump "host=%s port=%s user=%s dbname=%s" -s -t %s | psql "host=%s port=%s user=%s dbname=%s"$$ $sql$,
source_node_name,
i_source_node_port,
current_user,
current_database(),
(select string_agg(format($a$'%s'$a$,table_name),' -t ') from unnest(shard_fulltablename_array) table_name),
target_node_name,
i_target_node_port,
current_user,
current_database());
SELECT table_name
INTO target_shard_tables_with_data
FROM dblink('citus_move_shard_placement_target_con',
format($$ select string_agg(table_name,',') table_name from ((select tableoid::regclass::text table_name from %s limit 1))a $$,
(select string_agg(table_name,' limit 1) UNION all (select tableoid::regclass::text table_name from ')
from unnest(shard_fulltablename_array) table_name))
) as a(table_name text);
IF target_shard_tables_with_data is not NULL THEN
RAISE 'shard tables(%) with data has exists in target node', target_shard_tables_with_data;
END IF;
table_created := true;
IF shard_transfer_mode = 'force_logical' THEN
-- CREATE SUBSCRIPTION on target node
RAISE NOTICE 'CREATE SUBSCRIPTION on target node %:%', target_node_name, target_node_port;
PERFORM dblink_exec('citus_move_shard_placement_target_con',
'DROP SUBSCRIPTION IF EXISTS citus_move_shard_placement_sub CASCADE');
PERFORM dblink_exec('citus_move_shard_placement_target_con',
format($$CREATE SUBSCRIPTION citus_move_shard_placement_sub
CONNECTION 'host=%s port=%s user=%s dbname=%s'
PUBLICATION citus_move_shard_placement_pub$$,
source_node_name,
i_source_node_port,
current_user,
current_database()));
sub_created := true;
-- wait shard data init sync
RAISE NOTICE 'wait for init data sync...';
SELECT clock_timestamp() INTO t_start_sync_time;
LOOP
SELECT count_srsubid
INTO STRICT sub_rel_count_srsubid
FROM dblink('citus_move_shard_placement_target_con',
$$SELECT count(srsubid) count_srsubid from pg_subscription, pg_subscription_rel
WHERE pg_subscription.oid=pg_subscription_rel.srsubid AND
pg_subscription.subname = 'citus_move_shard_placement_sub' AND
(pg_subscription_rel.srsubstate = 's' OR pg_subscription_rel.srsubstate = 'r')$$
) AS t(count_srsubid int);
IF sub_rel_count_srsubid = noinh_shard_fulltablename_count THEN
EXIT;
ELSE
IF EXTRACT(EPOCH from clock_timestamp() - t_start_sync_time) > data_sync_timeout THEN
RAISE 'init data sync timeout.';
END IF;
PERFORM pg_sleep(1);
END IF;
END LOOP;
END IF;
-- lock tables from executing SQL
FOR i IN 1..colocated_table_count LOOP
IF drop_method = 'none' THEN
-- block all sql except for SELECT
RAISE NOTICE '[%/%] LOCK TABLE %.% IN EXCLUSIVE MODE ...',
i, colocated_table_count, logical_schema_array[i], logical_table_array[i];
EXECUTE format('LOCK TABLE %I.%I IN EXCLUSIVE MODE',
logical_schema_array[i],
logical_table_array[i]);
ELSE
-- block all sql
RAISE NOTICE '[%/%] LOCK TABLE %.% ...',
i, colocated_table_count, logical_schema_array[i], logical_table_array[i];
EXECUTE format('LOCK TABLE %I.%I',
logical_schema_array[i],
logical_table_array[i]);
END IF;
END LOOP;
--lock tables from executing SQL on workers which have metadata
IF metadata_node_count >=1 THEN
FOR j IN 1..metadata_node_count LOOP
--lock tables from executing DDL on citus_move_shard_placement_node
FOR i IN 1..colocated_table_count LOOP
IF drop_method = 'none' THEN
RAISE NOTICE '[citus_move_shard_placement_%_%_con][%/%] LOCK TABLE %.% IN EXCLUSIVE MODE ...',
metadata_nodename_array[j],
metadata_nodeport_array[j],
i, colocated_table_count,
logical_schema_array[i], logical_table_array[i];
PERFORM dblink_exec(format('citus_move_shard_placement_%s_%s_con',
metadata_nodename_array[j],
metadata_nodeport_array[j]),
format('LOCK TABLE %I.%I IN EXCLUSIVE MODE',
logical_schema_array[i],
logical_table_array[i]));
ELSE
RAISE NOTICE '[citus_move_shard_placement_%_%_con][%/%] LOCK TABLE %.% ...',
metadata_nodename_array[j],
metadata_nodeport_array[j],
i, colocated_table_count,
logical_schema_array[i], logical_table_array[i];
PERFORM dblink_exec(format('citus_move_shard_placement_%s_%s_con',
metadata_nodename_array[j],
metadata_nodeport_array[j]),
format('LOCK TABLE %I.%I',
logical_schema_array[i],
logical_table_array[i]));
END IF;
END LOOP;
END LOOP;
END IF;
IF shard_transfer_mode = 'force_logical' THEN
-- wait shard data sync
RAISE NOTICE 'wait for data sync...';
SELECT clock_timestamp() INTO t_start_sync_time;
SELECT sourcewallsn
INTO STRICT source_wal_lsn
FROM dblink('citus_move_shard_placement_source_con',
$$select pg_current_wal_lsn()$$
) AS t(sourcewallsn pg_lsn);
LOOP
SELECT lag
INTO STRICT sub_lag
FROM dblink('citus_move_shard_placement_target_con',
format($$select pg_wal_lsn_diff('%s',latest_end_lsn)
FROM pg_stat_subscription
WHERE subname = 'citus_move_shard_placement_sub' AND latest_end_lsn is not NULL$$,
source_wal_lsn::text)
) AS t(lag numeric);
IF sub_lag <= 0 THEN
EXIT;
ELSE
IF EXTRACT(EPOCH from clock_timestamp() - t_start_sync_time) > data_sync_timeout THEN
RAISE 'data sync timeout.';
END IF;
PERFORM pg_sleep(1);
END IF;
END LOOP;
ELSIF shard_transfer_mode = 'block_writes' THEN
-- COPY DATA FROM SOURCENODE TO TARGETNODE
FOR i IN 1..colocated_table_count LOOP
RAISE NOTICE '[%/%]copy table[%] data from source[%:%] to target[%:%]...',
i,colocated_table_count,shard_fulltablename_array[i],
source_node_name,i_source_node_port,target_node_name,i_target_node_port;
EXECUTE format($sql$COPY (select '') to PROGRAM $$psql "host=%s port=%s user=%s dbname=%s" -Atc 'copy %s to stdout' | psql "host=%s port=%s user=%s dbname=%s" -Atc 'copy %s from stdout'$$ $sql$,
source_node_name,
i_source_node_port,