Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BlobCompressor to compress blob and text fields on fly #221

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

dkropachev
Copy link
Collaborator

@dkropachev dkropachev commented Jul 22, 2024

Closes #218

Column-based blobs compression

General idea

If client stores big blobs of data compressing data that goes into that field will reduce operations select/update footprint on both network and server.

Interoperability issues

  1. Data will be readable only by properly configured gocql driver, cqlsh or other drivers won't be able to read it

Possible implementation of serialization/deserialization

1. Global variable + hack into marshalVarchar/unmarshalVarchar

# marshal.go

type blobCompressor interface {
	Compress([]byte) ([]byte, error)
	Decompress([]byte) ([]byte, error)
}

var MyGlobalCompressor blobCompressor

func unmarshalVarchar(info TypeInfo, data []byte, value interface{}) (err error) {
	if MyGlobalCompressor != nil {
		data, err = BlobCompressor.Decompress(data)
		if err != nil {
			return err
		}
	}
	return unmarshalVarcharRaw(info, data, value)
}

func marshalVarchar(info TypeInfo, data []byte, value interface{}) (err error) {
	if MyGlobalCompressor != nil {
		data, err = BlobCompressor.Decompress(data)
		if err != nil {
			return err
		}
	}
	return marshalVarcharRaw(info, data, value)
}

Pros:

  1. Easy to use
  2. Easy to implement

Cons:

  1. As dirty as it gets
  2. No control over cluster/session/column, compression is either globaly on or globaly off

2. Option in ClusterConfig + hack into NativeType

# marshal.go

type blobCompressor interface {
	Compress([]byte) ([]byte, error)
	Decompress([]byte) ([]byte, error)
}

type ClusterConfig struct {
	...
	BlobCompressor blobCompressor
}

type NativeType struct {
	...
	blobCompressor blobCompressor
}

func getCompressor(info TypeInfo) blobCompressor {
        nt, ok := info.(NativeType)
        if !ok {
                return nil
        }
        return nt.blobCompressor
}

func unmarshalVarchar(info TypeInfo, data []byte, value interface{}) (err error) {
	if c := getCompressor(info); c != nil {
		data, err = c.Decompress(data)
		if err != nil {
			return err
		}
	}
	return unmarshalVarcharRaw(info, data, value)
}

func marshalVarchar(info TypeInfo, data []byte, value interface{}) (err error) {
	if c := getCompressor(info); c != nil {
		data, err = c.Decompress(data)
		if err != nil {
			return err
		}
	}
	return marshalVarcharRaw(info, data, value)
}

Pros:

  1. Easy to use
  2. Less dirty than Global variable

Cons:

  1. No control over column, compression is either on or off for all the columns in the given session.
  2. Polutes NativeType

3. Custom type

type CompressedType struct{
    compressor *Compressor
    val []byte
}

func (ct *CompressedType) MarshalCQL(info TypeInfo) ([]byte, error) {
    return ct.compressor.Compress(ct.val)
}

func (ct *CompressedType) UnmarshalCQL(info TypeInfo, data []byte) (err error) {
   ct.val, err = ct.Decompress(data)
   return err
}

type Compressor struct{
	...
}

type (c Compressor) Blob() CompressedType

# How to use
...
	err = session.Query(
			`INSERT INTO gocql_test.test_blob_compressor (testuuid, testblob) VALUES (?, ?, ?)`,
			TimeUUID(), compressor.Blob().Set([]byte("my value")),
		).Exec()

Pros:

  1. Maximum control over which column is compressed
  2. No driver polution, could be implemented as part of lz4 package
  3. Not dirty at all

Cons;

  1. Usage a bit cumbersom

3. Custom type with global compressor

type CompressedType []byte

func (ct CompressedType) MarshalCQL(info TypeInfo) ([]byte, error) {
    return globalcompressor.Compress(ct)
}

func (ct *CompressedType) UnmarshalCQL(info TypeInfo, data []byte) (err error) {
   *val, err = globalcompressor.Decompress(data)
   return err
}

type Compressor struct{
	...
}

type (c Compressor) Blob() CompressedType

# How to use
...
	err = session.Query(
			`INSERT INTO gocql_test.test_blob_compressor (testuuid, testblob) VALUES (?, ?, ?)`,
			TimeUUID(), compressor.Blob([]byte("my value")),
		).Exec()

Pros:

  1. Maximum control over which column is compressed
  2. No driver polution, could be implemented as part of lz4 package
  3. Not dirty at all

@dkropachev dkropachev force-pushed the dk/add-auto-compression-for-blob branch from 6902f8c to 7983675 Compare July 22, 2024 01:06
@mykaul
Copy link

mykaul commented Jul 22, 2024

I'm missing the design and the documentation for such a feature. It should not be done hastily. We need to think of interoperability and configuration. Let's have a document on this (on the issue) first.

Copy link

@Lorak-mmk Lorak-mmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment: This new feature should be properly documented. Right now there is no documentation at all for it.
I know that gocql isn't very good in this regard, but the best way to improve that is to add good docs and comments for new / changed code.

}
m := make(map[string]interface{})

BlobCompressor = lz4.NewBlobCompressor(100)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC assigning to this global variable activates the blob compressor?
This is very unintuitive and hard to use interface:

  • not possible to have different compressors for different sessions / queries
  • much harder to test, because your test now relies on global state, so you can't concurrently run tests that need different compressors.
  • Harder for programmer to understand because this is another piece of state, but it's not present in any struct used (Session / query etc)

This new compressor interface is something very similar to a few policies / configurations that the driver already has (like RetryPolicy) and should be set the same way. It should be a field on session / query, with setters.

limit int
}

var defaultPrefix = []byte{0x01, 0x11, 0x22, 0x33}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think default prefix should exist. It's very possible for the data to have such prefix, so it's a new footgun for a driver user: if a user just uses a compressor, like you did in tests (BlobCompressor = lz4.NewBlobCompressor(100)), it may result in corrupt data. Correct usage requires user to think about data they store in database and what prefix is safe.
Best way to make user think about this is to require explicitly passing prefix.

Comment on lines 81 to 101
// Test for Iter.MapScan()
{
testMap := make(map[string]interface{})
if !session.Query(`SELECT * FROM test_blob_compressor`).Iter().MapScan(testMap) {
t.Fatal("MapScan failed to work with one row")
}
if diff := cmp.Diff(sliceMap[0], testMap); diff != "" {
t.Fatal("mismatch in returned map", diff)
}
}

// Test for Query.MapScan()
{
testMap := make(map[string]interface{})
if session.Query(`SELECT * FROM test_blob_compressor`).MapScan(testMap) != nil {
t.Fatal("MapScan failed to work with one row")
}
if diff := cmp.Diff(sliceMap[0], testMap); diff != "" {
t.Fatal("mismatch in returned map", diff)
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this test checks identity of insert / select, so it would pass if there was no compressor set / marshal module didn't use the compressor, right?
If so, then it should be improved - it's an integration test so it should check that the new feature works properly with the rest of the driver, right now it doesn't do this.

Comment on lines 30 to 93
}
buf := make([]byte, len(c.prefix)+lz4.CompressBlockBound(len(data)+4))
copy(buf, c.prefix)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This +4 looks like a mistake, we compress the data with size len(data), not len(data)+4

Comment on lines +37 to +102
// According to lz4.CompressBlock doc, it doesn't fail as long as the dst
// buffer length is at least lz4.CompressBlockBound(len(data))) bytes, but
// we check for error anyway just to be thorough.
if err != nil {
return nil, err
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure here is a very unexpected condition, maybe it deserves some log entry?

Comment on lines 52 to 54
if len(data) < 4+len(c.prefix) {
return nil, fmt.Errorf("compressed data should be >4, got=%d", len(data))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4+len(c.prefix) -> prefixPlusLen

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compressor should have unit tests for various scenarios:

  • decompressing a data without correct prefix
  • compress / decompress identity
  • compress, and verify that resulting buffer is correct
  • decompress with incorrect len in buffer

and preferably more scenarios that you can think of.

@Lorak-mmk
Copy link

Lorak-mmk commented Jul 22, 2024

Cover letter for the PR could be a bit more elaborate

  • why this feature is needed, what are the use cases (because it was not done in the issue)?
  • why this specific implementation was implemented, what were the alternatives and tradeoffs?
  • Why this should even be in the driver? There is nothing preventing doing the compression in code that uses the driver instead of in the driver itself, and afaik gocql strives to not add unnecessary features.

For examples of good PR cover letters see:

@dkropachev
Copy link
Collaborator Author

@mykaul , @Lorak-mmk , thanks for reviewing, it. Could you please take a look at PR Description, I have layed out all the options to implement this feature. Let's take a look and decide which way to go. I myself prefere N3 and N4. This PR was indended to be Draft just to test things out, unfortunately I missclicked and could not switch it back.

@sylwiaszunejko sylwiaszunejko marked this pull request as draft July 22, 2024 12:30
@sylwiaszunejko
Copy link
Collaborator

I converted it to draft

@mykaul
Copy link

mykaul commented Jul 22, 2024

If client stores big blobs of data compressing data that goes into that field will reduce operations select/update footprint on both network and server.

  1. Network - you have client<->server network compression.
    Server - the default is already compressing with LZ4.

Both btw use LZ4 (or snappy) by default - which is less suitable for JSON/TEXT blobs (as it has no entropy compression).
I think we should use zstd for it. (Another advantage is that I hope one day we'll do ZSTD client<->server as well...)

  1. You definitely need control for it. You need to have a minimum length where it is even reasonable to do it and a percentage where it makes sense to keep it uncompressed.
  2. If we do implement it, we'd want it for multiple drivers, not just gocql.
  3. It's unclear to me how we determine which BLOBS to compress and which not to. JPEG blobs are not an ideal candidate, for example.

Again, it'll be easier to understand and discuss better on the issue than the PR.

@dkropachev dkropachev force-pushed the dk/add-auto-compression-for-blob branch from 7983675 to fe93f24 Compare July 23, 2024 14:42
@dkropachev dkropachev force-pushed the dk/add-auto-compression-for-blob branch from fe93f24 to 847b9e4 Compare July 23, 2024 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[RFE]: Implement values compression for text and blob types
4 participants