Skip to content

Commit

Permalink
support ByteBuffers that have hasArray=false (#838)
Browse files Browse the repository at this point in the history
* support ByteBuffers that have hasArray=false

* format

* add tests

* fix merge issue
  • Loading branch information
pjfanning authored May 25, 2024
1 parent 80b243d commit 5c97409
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.{ByteArrayInputStream, File, InputStream}
import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import org.apache.avro.Schema

import scala.util.Try
Expand Down Expand Up @@ -55,7 +56,8 @@ class AvroInputStreamBuilder[T: Decoder](format: AvroFormat) {
def from(file: File): AvroInputStreamBuilderWithSource[T] = from(file.toPath)
def from(in: InputStream): AvroInputStreamBuilderWithSource[T] = new AvroInputStreamBuilderWithSource(format, in)
def from(bytes: Array[Byte]): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(bytes))
def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(buffer.array))
def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from(
new ByteArrayInputStream(ByteBufferHelper.asArray(buffer)))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import java.nio.ByteBuffer
import java.time.Instant
import java.util.UUID

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import org.apache.avro.LogicalTypes.Decimal
import org.apache.avro.generic.{GenericEnumSymbol, GenericFixed}
import org.apache.avro.generic.GenericFixed
import org.apache.avro.util.Utf8
import org.apache.avro.{Conversions, Schema}
//import CustomDefaults._
Expand Down Expand Up @@ -35,7 +36,7 @@ object DefaultResolver {
val decimalConversion = new Conversions.DecimalConversion
val bd = decimalConversion.fromBytes(byteBuffer, schema, schema.getLogicalType)
java.lang.Double.valueOf(bd.doubleValue)
case byteBuffer: ByteBuffer => byteBuffer.array()
case byteBuffer: ByteBuffer => ByteBufferHelper.asArray(byteBuffer)
case x: scala.Long => java.lang.Long.valueOf(x)
case x: scala.Boolean => java.lang.Boolean.valueOf(x)
case x: scala.Int => java.lang.Integer.valueOf(x)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sksamuel.avro4s.avroutils

import java.nio.ByteBuffer

private[avro4s] object ByteBufferHelper {
def asArray(byteBuffer: ByteBuffer): Array[Byte] = {
if (byteBuffer.hasArray) {
byteBuffer.array()
} else {
val bytes = new Array[Byte](byteBuffer.remaining)
byteBuffer.get(bytes)
bytes
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.avro4s.decoders

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sDecodingException, Decoder}
import org.apache.avro.Schema

Expand All @@ -18,7 +19,7 @@ trait ByteDecoders:
object ArrayByteDecoder extends Decoder[Array[Byte]] :
override def decode(schema: Schema): Any => Array[Byte] = { value =>
value match {
case buffer: ByteBuffer => buffer.array
case buffer: ByteBuffer => ByteBufferHelper.asArray(buffer)
case array: Array[Byte] => array
case fixed: org.apache.avro.generic.GenericFixed => fixed.bytes
case _ => throw new Avro4sDecodingException(s"ArrayByteDecoder cannot decode '$value'", value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.sksamuel.avro4s.decoders

import com.sksamuel.avro4s.encoders.{ByteStringEncoder, StringEncoder, UTF8StringEncoder}
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder, Encoder}
import org.apache.avro.generic.{GenericData, GenericFixed}
import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder}
import org.apache.avro.generic.GenericFixed
import org.apache.avro.util.Utf8
import org.apache.avro.{AvroRuntimeException, Schema}
import org.apache.avro.Schema

import java.nio.ByteBuffer
import java.util.UUID
Expand All @@ -28,7 +28,7 @@ object StringDecoder extends Decoder[String] :
case string: String => string
case charseq: CharSequence => charseq.toString
case b: Array[Byte] => new Utf8(b).toString
case bytes: ByteBuffer => new Utf8(bytes.array()).toString
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString
case fixed: GenericFixed => new Utf8(fixed.bytes()).toString
case _ => throw new Avro4sDecodingException(s"Unsupported type $string ${string.getClass} for StringDecoder", string)
}
Expand All @@ -41,7 +41,7 @@ object CharSequenceDecoder extends Decoder[CharSequence]:
case string: String => string
case charseq: CharSequence => charseq
case b: Array[Byte] => new Utf8(b)
case bytes: ByteBuffer => new Utf8(bytes.array())
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes))
case fixed: GenericFixed => new Utf8(fixed.bytes())
}
}
Expand All @@ -58,7 +58,7 @@ object UTF8Decoder extends Decoder[Utf8] :
case utf8: Utf8 => utf8
case string: String => new Utf8(string)
case b: Array[Byte] => new Utf8(b)
case bytes: ByteBuffer => new Utf8(bytes.array())
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes))
case fixed: GenericFixed => new Utf8(fixed.bytes())
}
}
Expand Down Expand Up @@ -88,7 +88,7 @@ object ByteStringDecoder extends Decoder[String] :
override def decode(schema: Schema): Any => String = { input =>
input match {
case b: Array[Byte] => new Utf8(b).toString
case bytes: ByteBuffer => new Utf8(bytes.array()).toString
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.avro4s.encoders

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Encoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
Expand Down Expand Up @@ -37,7 +38,8 @@ object ByteArrayEncoder extends Encoder[Array[Byte]] :
object FixedByteBufferEncoder extends Encoder[ByteBuffer] {
override def encode(schema: Schema): ByteBuffer => Any = { value =>
val array = new Array[Byte](schema.getFixedSize)
System.arraycopy(value.array(), 0, array, 0, value.array().length)
val bbArray = ByteBufferHelper.asArray(value)
System.arraycopy(bbArray, 0, array, 0, bbArray.length)
GenericData.get.createFixed(null, array, schema)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.sksamuel.avro4s.encoders

import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder, FieldMapper}
import org.apache.avro.Conversions.UUIDConversion
import org.apache.avro.{Conversions, LogicalTypes, Schema}
import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.util.Utf8

Expand Down Expand Up @@ -49,4 +49,5 @@ object FixedStringEncoder extends Encoder[String] :
val bytes = string.getBytes(StandardCharsets.UTF_8)
if (bytes.length > schema.getFixedSize)
throw new Avro4sEncodingException(s"Cannot write string with ${bytes.length} bytes to fixed type of size ${schema.getFixedSize}")
GenericData.get.createFixed(null, ByteBuffer.allocate(schema.getFixedSize).put(bytes).array, schema).asInstanceOf[GenericData.Fixed]
GenericData.get.createFixed(null,
ByteBufferHelper.asArray(ByteBuffer.allocate(schema.getFixedSize).put(bytes)), schema).asInstanceOf[GenericData.Fixed]
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class ByteArrayDecoderTest extends AnyFunSuite with Matchers {
Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9)
}

test("decode read-only ByteBuffer to Vector[Byte]") {
val schema = AvroSchema[VectorTest]
val record = new GenericData.Record(schema)
record.put("z", ByteBuffer.wrap(Array[Byte](1, 4, 9)).asReadOnlyBuffer())
Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9)
}

test("decode Array[Byte] to List[Byte]") {
val schema = AvroSchema[ListTest]
val record = new GenericData.Record(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ class ByteArrayEncoderTest extends AnyFunSuite with Matchers {
fixed.bytes().length shouldBe 7
}

test("encode byte buffers as FIXED (ReadOnlyBuffer)") {
val schema = SchemaBuilder.fixed("foo").size(7)
val fixed = Encoder[ByteBuffer]
.encode(schema)
.apply(ByteBuffer.wrap("hello".getBytes).asReadOnlyBuffer())
.asInstanceOf[GenericFixed]
fixed.bytes().toList shouldBe Seq(104, 101, 108, 108, 111, 0, 0)
fixed.bytes().length shouldBe 7
}

test("encode top level byte arrays") {
val encoder = Encoder[Array[Byte]]
Expand Down

0 comments on commit 5c97409

Please sign in to comment.