diff --git a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java index 4b870b3..cf7bdeb 100644 --- a/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java +++ b/rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java @@ -34,16 +34,19 @@ public final class Headers { public static int HEADER_LENGTH_MAX = 8192; - private static final Headers EMPTY = new Headers(false, Collections.emptyList(), 0); - private static final Headers DEFAULT_SERVICE = new Headers(true, Collections.emptyList(), 0); + private static final Headers EMPTY = new Headers(false, 0, Collections.emptyList(), 0); + private static final Headers DEFAULT_SERVICE = new Headers(true, 0, Collections.emptyList(), 0); private final boolean isDefaultService; private final int serializedSize; + private final long timeoutMillis; private final List keyValues; private volatile ByteBuf cache; - private Headers(boolean isDefaultService, List keyValues, int serializedSize) { + private Headers( + boolean isDefaultService, long timeoutMillis, List keyValues, int serializedSize) { this.isDefaultService = isDefaultService; + this.timeoutMillis = timeoutMillis; this.keyValues = keyValues; this.serializedSize = serializedSize; } @@ -52,6 +55,10 @@ public boolean isDefaultService() { return isDefaultService; } + public long timeoutMillis() { + return timeoutMillis; + } + public String header(String name) { if (!isValidKeySize(name)) { return null; @@ -155,7 +162,7 @@ public static Headers create(boolean isDefaultService, String... headers) { if (headers.length == 0) { return isDefaultService ? DEFAULT_SERVICE : EMPTY; } - return new Headers(isDefaultService, Arrays.asList(headers), serializedSize); + return new Headers(isDefaultService, 0, Arrays.asList(headers), serializedSize); } public static Headers empty() { @@ -166,6 +173,14 @@ public static Headers withDefaultService() { return DEFAULT_SERVICE; } + public static Headers withTimeout(long timeoutMillis) { + requireNonNegative(timeoutMillis, "timeoutMillis"); + if (timeoutMillis == 0) { + return EMPTY; + } + return new Headers(false, timeoutMillis, Collections.emptyList(), 0); + } + public static Headers.Builder newBuilder() { return new Builder(4, Collections.emptyList()); } @@ -179,7 +194,7 @@ static Headers create(List headers) { if (headers.isEmpty()) { return EMPTY; } - return new Headers(false, headers, serializedSize); + return new Headers(false, 0, headers, serializedSize); } ByteBuf cache() { @@ -203,6 +218,7 @@ public int serializedSize() { public static final class Builder { private final List nameValues; private boolean isDefaultService; + private long timeoutMillis; private int serializedSize; private Builder(int size, List headers) { @@ -225,6 +241,11 @@ public Builder defaultService(boolean isDefaultService) { return this; } + public Builder timeout(long timeoutMillis) { + this.timeoutMillis = requireNonNegative(timeoutMillis, "timeoutMillis"); + return this; + } + public Builder add(String name, String value) { requireValidKeySize(name, " name"); requireValidValueSize(value, " value"); @@ -278,7 +299,7 @@ public Builder remove(String name, String value) { } public Headers build() { - return new Headers(isDefaultService, nameValues, serializedSize); + return new Headers(isDefaultService, timeoutMillis, nameValues, serializedSize); } } @@ -316,6 +337,13 @@ private static int requireValid(List keyValues, String message) { return size; } + private static long requireNonNegative(long value, String message) { + if (value < 0) { + throw new IllegalArgumentException(message + " must be non-negative"); + } + return value; + } + private static int requireValid(String[] keyValues, String message) { Objects.requireNonNull(keyValues, "keyValues"); int length = keyValues.length;