Skip to content

Commit

Permalink
supabase: add registered_avro_schemas table
Browse files Browse the repository at this point in the history
This table provides a global namespace for u32 schema IDs which appear
within encoded Kafka keys and values.

IDs are integrated into our catalog authorization model, so one is still
only allowed to inspect a schema to which one is read authorized, but
the table still facilitates deduplication of common schema shapes.

To register or retrieve a schema, a user must only be read-authorized to
its corresponding catalog name. This means that all readers of a
shared collection are able to re-use the same schema ID.
  • Loading branch information
jgraettinger committed Apr 12, 2024
1 parent 665c2c3 commit dbafa28
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
32 changes: 32 additions & 0 deletions supabase/migrations/47_registered_avro_schemas.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

create table registered_avro_schemas (
like internal._model including all,

avro_schema json not null,
avro_schema_md5 text generated always as (md5(trim(avro_schema::text))) stored,
catalog_name catalog_name not null,
registry_id serial unique not null
);

create index idx_registered_avro_schemas_avro_schema_md5 on registered_avro_schemas (avro_schema_md5);

comment on table registered_avro_schemas is '
Avro schemas registered with a globally unique, stable registery ID.
This is used to emulate the behavior of Confluent Schema Registry when
transcoding collection documents into Avro for use with Dekaf,
which must encode each message with an Avro schema ID (registry_id).
';

alter table registered_avro_schemas enable row level security;

create policy "Users must be read-authorized to the schema catalog name"
on registered_avro_schemas as permissive
using (exists(
select 1 from auth_roles('read') r where catalog_name ^@ r.role_prefix
));

grant select on registered_avro_schemas to authenticated;
grant insert (catalog_name, avro_schema) on registered_avro_schemas to authenticated;
grant update (updated_at) on registered_avro_schemas to authenticated;
grant usage on sequence registered_avro_schemas_registry_id_seq to authenticated;
48 changes: 48 additions & 0 deletions supabase/tests/registered_avro_schemas.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
create function tests.test_registered_avro_schemas()
returns setof text as $$

insert into user_grants (user_id, object_role, capability) values
('11111111-1111-1111-1111-111111111111', 'aliceCo/', 'read'),
('22222222-2222-2222-2222-222222222222', 'bobCo/', 'read')
;

delete from registered_avro_schemas;
alter sequence registered_avro_schemas_registry_id_seq restart with 1;

-- Insert schemas as Alice.
select set_authenticated_context('11111111-1111-1111-1111-111111111111');

insert into registered_avro_schemas (catalog_name, avro_schema) values
('aliceCo/foo', '{"type":"record","name":"hello","fields":[{"name":"alice","type":"int"}]}'),
('aliceCo/bar', '{"type":"string"}');

-- Insert schemas as Bob.
select set_authenticated_context('22222222-2222-2222-2222-222222222222');

insert into registered_avro_schemas (catalog_name, avro_schema) values
('bobCo/baz', '{"type":"long"}'),
('bobCo/bing', '{"type":"string"}');

-- Assert schemas visible to Alice.
select set_authenticated_context('11111111-1111-1111-1111-111111111111');

select results_eq(
$i$ select catalog_name::text, registry_id, avro_schema_md5 from registered_avro_schemas order by catalog_name $i$,
$i$ values ('aliceCo/bar', 2, '2809284b6e54d0d34017715ffe5636bd'),
('aliceCo/foo', 1, '6fdea0e6b3acfece5ce250be461f6617')
$i$,
'alice schemas'
);

-- Assert schemas visible to Bob.
select set_authenticated_context('22222222-2222-2222-2222-222222222222');

select results_eq(
$i$ select catalog_name::text, registry_id, avro_schema_md5 from registered_avro_schemas order by catalog_name $i$,
$i$ values ('bobCo/baz', 3, '509e9d5641b97707c7e6f51a91334755'),
('bobCo/bing', 4, '2809284b6e54d0d34017715ffe5636bd')
$i$,
'bob schemas'
);

$$ language sql;

0 comments on commit dbafa28

Please sign in to comment.