Skip to content

Commit

Permalink
Support for configurable compression algorithm in Zarr IO (chapel-lan…
Browse files Browse the repository at this point in the history
…g#25751)

This PR adds support for selecting between different compression
algorithms when writing Zarr stores. Supported options are `blosclz`,
`lz4`, `lz4hc`, `zlib`, and `zstd`. This PR also includes tests for
these options. Resolves
Cray/chapel-private#6588.

Zarr tests run locally, paratest passes on ChapDL

Reviewed by: @benharsh
  • Loading branch information
brandon-neth authored Aug 14, 2024
2 parents dda3726 + 9e5d38e commit f600933
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 14 deletions.
44 changes: 30 additions & 14 deletions modules/packages/Zarr.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
* limitations under the License.
*/

/*
Support for distributed reading and writing of Zarr stores. Support is
limited to v2 Zarr arrays stored on local filesystems. NFS is not supported.
The module uses c-blosc to compress and decompress chunks. Zarr
specification: https://zarr-specs.readthedocs.io/en/latest/v2/v2.0.html
/* Support for reading and writing of Zarr stores.
Support is limited to v2 Zarr arrays stored on local filesystems. NFS
is not supported. The module uses c-blosc to compress and decompress chunks.
Zarr specification: https://zarr-specs.readthedocs.io/en/latest/v2/v2.0.html
*/
module Zarr {
use IO;
Expand Down Expand Up @@ -83,6 +83,7 @@ module Zarr {
var chunks: list(int);
var dtype: string;
var shape: list(int);
var compressor: string;
}

/* Unused until support is added for v3.0 stores */
Expand Down Expand Up @@ -141,6 +142,13 @@ module Zarr {
throw new Error("Expected entries of type %s. Found %s".format(dtype:string, chplType));
}

private proc validateCompressor(compressor) throws {
const supportedCompressors = ["blosclz", "lz4", "lz4hc", "zlib", "zstd"];
if supportedCompressors.find(compressor) == -1 {
throw new IllegalArgumentError("Unsupported compressor: %s.".format(compressor) +
" Supported compressors are: blosclz, lz4, lz4hc, zlib, and zstd.");
}
}

private proc buildChunkPath(directoryPath: string, delimiter: string, const chunkIndex: ?dimCount * int) {
var indexStrings: dimCount*string;
Expand Down Expand Up @@ -277,7 +285,7 @@ module Zarr {
:throws Error: If the compression fails
*/
proc writeChunk(param dimCount, chunkPath: string, chunkDomain: domain(dimCount), ref arraySlice: [] ?t, bloscLevel: int(32) = 9) throws {
proc writeChunk(param dimCount, chunkPath: string, chunkDomain: domain(dimCount), ref arraySlice: [] ?t, bloscLevel: int(32) = 9, compressor: string="blosclz") throws {
var s: stopwatch;

// bloscLevel must be between 0 and 9
Expand Down Expand Up @@ -305,7 +313,7 @@ module Zarr {
var bytesCompressed = blosc_compress_ctx(_bloscLevel, 0, c_sizeof(t),
(copyOut.size*c_sizeof(t)) : c_size_t, c_ptrTo(copyOut),
compressedBuffer, ((copyOut.size + 16) * c_sizeof(t)) : c_size_t,
"blosclz", 0 : c_size_t, 1 : c_size_t);
compressor.c_str(), 0 : c_size_t, 1 : c_size_t);
if bytesCompressed == 0 then
throw new Error("Failed to compress bytes");
if zarrProfiling then times["Compression"].add(s.elapsed());
Expand Down Expand Up @@ -404,14 +412,18 @@ module Zarr {
:arg bloscLevel: Compression level to use. 0 indicates no compression,
9 (default) indicates maximum compression.
:arg compressor: Compression algorithm to use. Supported values are "blosclz" (default),
"lz4", "lz4hc", "zlib", and "zstd".
*/
proc writeZarrArray(directoryPath: string, const ref A: [?domainType] ?dtype, chunkShape: ?dimCount*int, bloscLevel: int(32) = 9) throws {
proc writeZarrArray(directoryPath: string, const ref A: [?domainType] ?dtype, chunkShape: ?dimCount*int, bloscLevel: int(32) = 9, compressor="blosclz") throws {

// Create the metadata record that is written before the chunks
var shape, chunks: list(int);
for size in A.shape do shape.pushBack(size);
for size in chunkShape do chunks.pushBack(size);
const md: zarrMetadataV2 = new zarrMetadataV2(2, chunks, dtypeString(dtype), shape);
validateCompressor(compressor);
const md: zarrMetadataV2 = new zarrMetadataV2(2, chunks, dtypeString(dtype), shape, compressor);

// Clear the directory before writing
if exists(directoryPath) then rmTree(directoryPath);
Expand Down Expand Up @@ -454,7 +466,7 @@ module Zarr {
ref thisChunkSlice = hereA.localSlice(thisChunkHere);
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);
locks[chunkIndex].writeEF(true);
writeChunk(dimCount, chunkPath, thisChunkDomain, thisChunkSlice, bloscLevel=bloscLevel);
writeChunk(dimCount, chunkPath, thisChunkDomain, thisChunkSlice, bloscLevel=bloscLevel, compressor=compressor);
locks[chunkIndex].readFE();
}
}
Expand Down Expand Up @@ -521,14 +533,18 @@ module Zarr {
:arg bloscLevel: Compression level to use. 0 indicates no compression,
9 (default) indicates maximum compression.
:arg compressor: Compression algorithm to use. Supported values are "blosclz" (default),
"lz4", "lz4hc", "zlib", and "zstd".
*/
proc writeZarrArrayLocal(directoryPath: string, ref A: [?domainType] ?dtype, chunkShape: ?dimCount*int, bloscLevel: int(32) = 9) throws {
proc writeZarrArrayLocal(directoryPath: string, ref A: [?domainType] ?dtype, chunkShape: ?dimCount*int, bloscLevel: int(32) = 9, compressor="blosclz") throws {

// Create the metadata record that is written before the chunks
var shape, chunks: list(int);
for size in A.shape do shape.pushBack(size);
for size in chunkShape do chunks.pushBack(size);
const md: zarrMetadataV2 = new zarrMetadataV2(2, chunks, dtypeString(dtype), shape);
validateCompressor(compressor);
const md: zarrMetadataV2 = new zarrMetadataV2(2, chunks, dtypeString(dtype), shape, compressor);

// Clear the directory before writing
if exists(directoryPath) then rmTree(directoryPath);
Expand All @@ -555,7 +571,7 @@ module Zarr {
const chunkForDomain = D[chunkBounds];
ref chunkData = normA[chunkForDomain];
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);
writeChunk(dimCount, chunkPath, chunkBounds, chunkData, bloscLevel=bloscLevel);
writeChunk(dimCount, chunkPath, chunkBounds, chunkData, bloscLevel=bloscLevel, compressor=compressor);
}

blosc_destroy();
Expand Down Expand Up @@ -593,7 +609,7 @@ module Zarr {
const chunkPath = buildChunkPath(directoryPath, ".", chunkIndex);

blosc_init();
writeChunk(dimCount, chunkPath, chunkData.domain, chunkData);
writeChunk(dimCount, chunkPath, chunkData.domain, chunkData, compressor=md.compressor);
blosc_destroy();
}

Expand Down
66 changes: 66 additions & 0 deletions test/library/packages/Zarr/ZarrCompressors.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use Zarr;
use IO;
use FileSystem;
use BlockDist;
use Random;

proc localTest(compressor: string) {
const N = 20;
const D: domain(2) = {0..<N, 0..<2*N};
var A: [D] real(32);
fillRandom(A);

if (exists("TestLocal")) then rmTree("TestLocal");
writeZarrArrayLocal("TestLocal", A, (7,18), compressor=compressor);

var B = readZarrArrayLocal("TestLocal", real(32), 2);
assert(A.domain == B.domain, "Domain mismatch: %? %?".format(A.domain, B.domain));
forall i in A.domain do
assert(A[i] == B[i], "Mismatch for real data on indices: %?.\nWritten: %?\nRead: %?".format(i, A[i], B[i]));
rmTree("TestLocal");
}

proc distributedTest(compressor: string) {
const N = 20;
const D: domain(2) dmapped new blockDist({0..N, 0..<2*N}) = {0..<N, 0..<2*N};
var A: [D] real(32);
fillRandom(A);

if (exists("TestDistributed")) then rmTree("TestDistributed");
writeZarrArray("TestDistributed", A, (7,18), compressor=compressor);

var B = readZarrArray("TestDistributed", real(32), 2);
assert(A.domain == B.domain, "Domain mismatch: %? %?".format(A.domain, B.domain));
forall i in A.domain do
assert(A[i] == B[i], "Mismatch for real data on indices: %?.\nWritten: %?\nRead: %?".format(i, A[i], B[i]));
rmTree("TestDistributed");
}

proc testUnsupportedCompressor() {
var A: [1..10] real(32);
try {
writeZarrArrayLocal("TestUnsupportedCompressor", A, (7,), compressor="unsupported");
assert(false, "Expected an error for unsupported compressor");
} catch e {
assert(e.message() == "Unsupported compressor: unsupported. Supported compressors are: blosclz, lz4, lz4hc, zlib, and zstd.", e.message());
}
try {
writeZarrArray("TestUnsupportedCompressor", A, (7,), compressor="unsupported");
assert(false, "Expected an error for unsupported compressor");
} catch e {
assert(e.message() == "Unsupported compressor: unsupported. Supported compressors are: blosclz, lz4, lz4hc, zlib, and zstd.", e.message());
}

}


proc main() {
var compressors = ["blosclz", "lz4", "lz4hc", "zlib", "zstd"];
for compressor in compressors {
writeln("Testing ", compressor);
localTest(compressor);
distributedTest(compressor);
}
testUnsupportedCompressor();
writeln("Pass");
}
6 changes: 6 additions & 0 deletions test/library/packages/Zarr/ZarrCompressors.good
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Testing blosclz
Testing lz4
Testing lz4hc
Testing zlib
Testing zstd
Pass

0 comments on commit f600933

Please sign in to comment.