Skip to content

Commit

Permalink
Use zed-js for all Zed queries (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
philrz authored Jun 6, 2023
1 parent 643650e commit 7bb5640
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 119 deletions.
7 changes: 2 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
# Changelog

## 1.0.0 (Unreleased)

Initial release.
## 1.0.0
* Initial release
227 changes: 113 additions & 114 deletions src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import {
FieldType,
DateTime,
} from '@grafana/data';
import { getBackendSrv, getTemplateSrv } from '@grafana/runtime';
import { getTemplateSrv } from '@grafana/runtime';
import { MyQuery, MyDataSourceOptions } from './types';
import { lastValueFrom } from 'rxjs';
import { Client } from '@brimdata/zed-js';
import { TypeRecord, Client } from '@brimdata/zed-js';

export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
url: string;
Expand All @@ -27,154 +26,154 @@ export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
const timeField = query.timeField || 'ts';
const rangeFrom = from.toISOString();
const rangeTo = to.toISOString();
const zedClient = new Client(this.url);

if (pool === undefined) {
const poolsObservable = getBackendSrv().fetch<Array<{}>>({
method: 'POST',
url: this.url + '/query',
data: { query: 'from :pools | cut name' },
});
const pools = await lastValueFrom(poolsObservable);
if (pools.data.length === 0) {
const pools = await zedClient.getPools();
if (pools.length === 0) {
throw new Error('No pools found in lake at ' + this.url);
} else {
throw new Error(
'Pool must be specified in "From". Available pools in lake at ' +
this.url +
': ' +
pools.data
.map((p: { [x: string]: any }) => {
return p['name'];
})
.join()
pools.map(p => { return p['name']; }).join(', ')
);
}
}

const wholeQuery =
'from "' +
pool +
'" | ' +
timeField +
' > ' +
rangeFrom +
' and ' +
timeField +
' < ' +
rangeTo +
' | ' +
zedQuery +
' | sort ' +
timeField;

const wholeQuery = `from "${pool}" | ${timeField} > ${rangeFrom} and ${timeField} < ${rangeTo} | ${zedQuery} | sort ${timeField}`;
const finalQuery = getTemplateSrv().replace(wholeQuery, options.scopedVars, 'csv');

// The Zui app is able to show its "Shapes:" count withut a separate query.
// Once we move the plugin to the Zealot client we should be able to do
// the same.
const shapeQuery = finalQuery + ' | by typeof(this) | count()';
const shapeCountObservable = await getBackendSrv().fetch<Array<{}>>({
method: 'POST',
url: this.url + '/query',
data: { query: shapeQuery },
});
const shapeCount = await lastValueFrom(shapeCountObservable);
if (shapeCount.data.length === 0) {
throw new Error('No data points found to plot in this time range');
} else if (shapeCount.data[0] > 1) {
const resultStream = await zedClient.query(finalQuery);
await resultStream.promise;

// Before we attempt to build a dataframe out of the query response to hand to
// Grafana for plotting, we'll check the shapes that were returned alongside the
// query response.
const shapes = resultStream.shapes as TypeRecord[];

// We'll reject any data that came back with multiple shapes since this makes
// it difficult/impossible to reliably copy the data points from the response
// into columns in Grafana's data frame.
if (shapes.length > 1 ) {
throw new Error('More than one shape detected (consider using "cut" or "fuse")');

// If there were no shapes at all, it means there's no data to plot. This could
// have been caused by one of a few things.
//
// 1. The query/pool are valid, but the user just has the time range pickers in
// Grafana set to a range where there's no data. This is innocuous.
//
// 2. The Time Field is not usable for some reason. A common example of this would
// be if a user imported JSON data where the Time Field actually contains
// string-typed values that look like timestamps, but since they're not of
// the genuine Zed "time" type, the time-based filtering done in the query
// using the values from Grafana's time range picker will fail to locate any
// points. In the future we may come up with some way for the plugin to
// transparently cope in such a situation (grafana-zed-datasource/issues/10)
// but for now we'll just inform the user of what the plugin saw so they can
// shape their data and try again.
//
// In order to tell the difference between these two cases we need to execute
// the following special query focused on just the Time Field.
} else if (shapes.length === 0) {
const timeCheckQuery = `from "${pool}" | union(typeof(${timeField}))`;

const timeCheckStream = await zedClient.query(timeCheckQuery);
await timeCheckStream.promise;
const timeCheckResult = await timeCheckStream.js();

if (timeCheckResult.length === 1 && timeCheckResult[0].length === 1 && timeCheckResult[0][0] === 'time') {
throw new Error('No data points found to plot in this time range');
} else {
throw new Error('Time Field "' + timeField + '" must be Zed <time> type, but detected type(s): ' + timeCheckResult[0].join(', '));
}
}

// Find all the fields that will be added to the data frame. The time
// field is always made the leftmost field since black box testing has
// Find all the fields that will be added to the data frame. The Time
// Field is always made the leftmost field since black box testing has
// indicated that if there's multiple time-typed fields Grafana will use
// the leftmost one.
const frameQuery = finalQuery + ' | head 1 | over this => ( yield {key:key[0],type:typeof(value)} )';
const fieldsInfoObservable = await getBackendSrv().fetch<Array<{}>>({
method: 'POST',
url: this.url + '/query',
data: { query: frameQuery },
});
const fieldsInfo = await lastValueFrom(fieldsInfoObservable);
let frameFields: Array<{ name: string; type: FieldType }> = [];
fieldsInfo.data.forEach((point: any) => {
// Black box testing has shown that a field named the empty string ""
// ends up in Grafana with a confusing name in the legend like
// "Field 2". Therefore we'll handle it as a special case.
if (point.key === '') {
point.key = '(empty string)';
}
if (shapes[0] && shapes[0].fields) {
shapes[0].fields.filter((f) => f.type.kind === "primitive").map(f => {

if (point.key === timeField) {
frameFields.unshift({ name: point.key, type: FieldType.time });
} else if (
point.type === '<uint8>' ||
point.type === '<uint16>' ||
point.type === '<uint32>' ||
point.type === '<uint64>' ||
point.type === '<uint128>' ||
point.type === '<uint256>' ||
point.type === '<int8>' ||
point.type === '<int16>' ||
point.type === '<int32>' ||
point.type === '<int64>' ||
point.type === '<int128>' ||
point.type === '<int256>' ||
point.type === '<float16>' ||
point.type === '<float32>' ||
point.type === '<float64>' ||
point.type === '<float128>' ||
point.type === '<float256>' ||
point.type === '<decimal32>' ||
point.type === '<decimal64>' ||
point.type === '<decimal128>' ||
point.type === '<decimal256>'
) {
frameFields.push({ name: point.key, type: FieldType.number });
} else if (
point.type === '<string>' ||
point.type === '<ip>' ||
point.type === '<net>' ||
point.type === '<type>' ||
point.type === '<bytes>' ||
point.type === '<duration>'
) {
frameFields.push({ name: point.key, type: FieldType.string });
} else if (point.type === '<time>') {
frameFields.push({ name: point.key, type: FieldType.time });
} else if (point.type === '<bool>') {
frameFields.push({ name: point.key, type: FieldType.boolean });
}
});
if (!('name' in f.type)) {
throw new Error('Fatal error - Query response contains a Zed type with no name (please open an issue at https://github.com/brimdata/grafana-zed-datasource/issues)');
}

const resultObservable = await getBackendSrv().fetch<Array<{}>>({
method: 'POST',
url: this.url + '/query',
data: { query: finalQuery },
});
// Black box testing has shown that a field named the empty string ""
// ends up in Grafana with a confusing name in the legend like
// "Field 2". Therefore we'll handle it as a special case.
if (f.name === '') {
f.name = '(empty string)';
}

const result = await lastValueFrom(resultObservable);
return { frameFields: frameFields, response: result };
if (f.name === timeField) {
frameFields.unshift({ name: f.name, type: FieldType.time });
} else if (
f.type.name === 'uint16' ||
f.type.name === 'uint32' ||
f.type.name === 'uint64' ||
f.type.name === 'uint128' ||
f.type.name === 'uint256' ||
f.type.name === 'int8' ||
f.type.name === 'int16' ||
f.type.name === 'int32' ||
f.type.name === 'int64' ||
f.type.name === 'int128' ||
f.type.name === 'int256' ||
f.type.name === 'float16' ||
f.type.name === 'float32' ||
f.type.name === 'float64' ||
f.type.name === 'float128' ||
f.type.name === 'float256' ||
f.type.name === 'decimal32' ||
f.type.name === 'decimal64' ||
f.type.name === 'decimal128' ||
f.type.name === 'decimal256'
) {
frameFields.push({ name: f.name, type: FieldType.number });
} else if (
f.type.name === 'string' ||
f.type.name === 'ip' ||
f.type.name === 'net' ||
f.type.name === 'type' ||
f.type.name === 'bytes' ||
f.type.name === 'duration'
) {
frameFields.push({ name: f.name, type: FieldType.string });
} else if (f.type.name === 'time') {
frameFields.push({ name: f.name, type: FieldType.time });
} else if (f.type.name === 'bool') {
frameFields.push({ name: f.name, type: FieldType.boolean });
}
});

return { frameFields: frameFields, response: await resultStream.js() };

// We don't expect to reach this spot. The "if" above was only there to
// make TypeScript happy.
} else {
throw new Error('Fatal error - Unknown problem with data shape (please open an issue at https://github.com/brimdata/grafana-zed-datasource/issues)');
}
}

async query(options: DataQueryRequest<MyQuery>): Promise<DataQueryResponse> {
const { range } = options;

const promises = options.targets.map((query) =>
this.doRequest(query, range!.from, range!.to, options).then((r) => {
const timeField = query.timeField || 'ts';

const frame = new MutableDataFrame({
refId: query.refId,
fields: r.frameFields,
});

r.response.data.forEach((point: any) => {
r.response.forEach((point: any) => {
frame.appendRow(
r.frameFields.map(function (f) {
if (f.name === timeField) {
return +new Date(point[f.name]);
} else if (f.name === '(empty string)') {
if (f.name === '(empty string)') {
return point[''];
} else {
return point[f.name];
Expand Down

0 comments on commit 7bb5640

Please sign in to comment.