forked from cainus/Prozess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FetchResponse.js
109 lines (95 loc) · 3.66 KB
/
FetchResponse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
var binary = require('binary');
var BufferMaker = require('buffermaker');
var Message = require('./Message');
var Response = require('./Response');
var _ = require('underscore');
var RESPONSE_HEADER_LENGTH = Response.getHeaderLength(),
MESSAGE_HEADER_LENGTH = Message.getHeaderLength();
/* HEADER
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| RESPONSE_LENGTH |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ERROR_CODE |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
RESPONSE_LENGTH = int32 // Length in bytes of entire response (excluding this field)
ERROR_CODE = int16 // See table below.
================ ===== ===================================================
ERROR_CODE VALUE DEFINITION
================ ===== ===================================================
Unknown -1 Unknown Error
NoError 0 Success
OffsetOutOfRange 1 Offset requested is no longer available on the server
InvalidMessage 2 A message you sent failed its checksum and is corrupt.
WrongPartition 3 You tried to access a partition that doesn't exist
(was not between 0 and (num_partitions - 1)).
InvalidFetchSize 4 The size you requested for fetching is smaller than
the message you're trying to fetch.
================ ===== ===================================================
*/
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ RESPONSE HEADER /
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ MESSAGES (0 or more) /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
var FetchResponse = function(error, messages){
var that = this;
this.error = error;
this.messages = messages; // an array of message objects
this.length = 2;
_.each(this.messages, function(msg){
that.length = msg.payload.length + MESSAGE_HEADER_LENGTH;
});
this.bytesLengthVal = null;
};
FetchResponse.prototype.byteLength = function(){
if (!this.bytesLengthVal){
this.toBytes();
}
return this.bytesLengthVal;
};
FetchResponse.prototype.toBytes = function(){
var body = new BufferMaker();
_.each(this.messages, function(msg){
body.string(msg.toBytes());
});
body = body.make();
var bytes = new BufferMaker()
.UInt32BE(body.length + 2)
.UInt16BE(this.error)
.string(body)
.make();
this.bytesLengthVal = bytes.length;
return bytes;
};
FetchResponse.fromBytes = function(bytes){
var response = Response.fromBytes(bytes);
var messages;
if (response.error !== Response.Errors.NoError){
messages = [];
} else {
messages = parseMessages(response.body);
}
this.bytesLengthVal = response.body.length + RESPONSE_HEADER_LENGTH;
return new FetchResponse(response.error, messages);
};
module.exports = FetchResponse;
var parseMessages = function(body){
var messages = [];
while(body.length > 0){
try {
var message = Message.fromBytes(body);
body = body.slice(message.payload.length + MESSAGE_HEADER_LENGTH);
messages.push(message);
} catch(ex){
break;
}
}
return messages;
};