-
Notifications
You must be signed in to change notification settings - Fork 0
/
statement.js
118 lines (102 loc) · 3.4 KB
/
statement.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
const fetch = require('node-fetch');
const https = require('https');
const HttpsProxyAgent = require('https-proxy-agent');
const statement = async (config = {}) => {
const { url, statement, database, schema, role, token, proxy, wait = 300 } = config;
const timeout = wait === 0 ? 0 : wait ? Math.min(wait, 604800) : undefined;
const method = "POST";
const body = {
statement,
database,
schema,
role,
timeout
};
const headers = {
"Content-type": "application/json",
"Authorization": `Bearer ${token}`
};
const options = {
method,
headers,
body: JSON.stringify(body)
};
if(proxy) {
const httpsAgent = new HttpsProxyAgent.HttpsProxyAgent(proxy);
options.agent = httpsAgent;
}
console.log(JSON.stringify(body));
// Run query
const result = await fetch(url, options);
const initialResponse = await result.json();
console.log(initialResponse.code, initialResponse.message);
let response;
// If query is still running, retry
if(initialResponse.code === "333334") {
try {
response = await getAsyncQueryResult(url, token, initialResponse.statementHandle, proxy, wait);
} catch (error) {
throw error;
}
} else if(initialResponse.code === "390318"){
throw new Error(initialResponse.message, {cause: "390318"})
} else if(initialResponse.code === "390303"){
throw new Error(initialResponse.message, {cause: "390303"})
} else {
response = initialResponse;
}
// Get metadata
const { resultSetMetaData: { partitionInfo, rowType }, statementHandle } = response;
// Get data from partitions
const dataArrays = await Promise.all(
partitionInfo.map (
async(partition, index) => await getPartition(url, token, statementHandle, proxy, index)
)
);
const data = dataArrays.flat(1);
return {rowType, data};
};
async function getPartition (baseUrl, token, statementHandle, proxy, partition) {
const headers = {
"Content-type": "application/json",
"Authorization": `Bearer ${token}`
};
const options = {headers};
if(proxy) {
const httpsAgent = new HttpsProxyAgent.HttpsProxyAgent(proxy);
options.agent = httpsAgent;
}
const url = `${baseUrl}${statementHandle}?partition=${partition}`;
const result = await fetch(url, options);
const response = await result.json();
return response.data;
}
async function getAsyncQueryResult (baseUrl, token, statementHandle, proxy, wait = 300, attempts = 0) {
attempts++;
const headers = {
"Content-type": "application/json",
"Authorization": `Bearer ${token}`
};
const url = baseUrl + statementHandle;
const options = {headers};
if(proxy) {
const httpsAgent = new HttpsProxyAgent.HttpsProxyAgent(proxy);
options.agent = httpsAgent;
}
const result = await fetch(url, options);
const response = await result.json();
if(response.code === "333334") {
if(attempts > wait) {
throw new Error(`Query did not complete within ${wait} seconds`);
} else {
console.log(`Query still running, retrying (attempt ${attempts}/${wait})`);
await new Promise(resolve => setTimeout(resolve, 1000));
return await getAsyncQueryResult(baseUrl, token, statementHandle, proxy, wait, attempts);
}
} else if (!response?.resultSetMetaData?.partitionInfo) {
throw new Error(`No partition info found in reponse. Response: ${JSON.stringify(response)}`);
} else {
return response;
}
}
module.exports = statement;