Skip to content

Commit

Permalink
feat: ksql and rollup pre-aggregations (#8619)
Browse files Browse the repository at this point in the history
Added support for pre-aggregations for ksqldb using select statement and direct load from kafka
  • Loading branch information
RusovDmitriy committed Sep 11, 2024
1 parent 70ff901 commit cdfbd1e
Show file tree
Hide file tree
Showing 21 changed files with 594 additions and 36 deletions.
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/src/driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export interface StreamTableData extends DownloadTableBase {
export interface StreamingSourceTableData extends DownloadTableBase {
streamingTable: string;
selectStatement?: string;
sourceTable?: any,
partitions?: number;
streamOffset?: string;
streamingSource: {
Expand Down Expand Up @@ -130,6 +131,7 @@ export type StreamOptions = {

export type StreamingSourceOptions = {
streamOffset?: boolean;
outputColumnTypes?: TableColumn[]
};

export interface DownloadQueryResultsBase {
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type CreateTableOptions = {
files?: string[]
aggregations?: string
selectStatement?: string
sourceTable?: any
sealAt?: string
delimiter?: string
};
Expand Down Expand Up @@ -118,6 +119,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
if (options.selectStatement) {
withEntries.push(`select_statement = ${escape(options.selectStatement)}`);
}
if (options.sourceTable) {
withEntries.push(`source_table = ${escape(`CREATE TABLE ${options.sourceTable.tableName} (${options.sourceTable.types.map(t => `${t.name} ${this.fromGenericType(t.type)}`).join(', ')})`)}`);
}
if (options.streamOffset) {
withEntries.push(`stream_offset = '${options.streamOffset}'`);
}
Expand Down Expand Up @@ -431,6 +435,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
indexes,
files: locations,
selectStatement: tableData.selectStatement,
sourceTable: tableData.sourceTable,
streamOffset: tableData.streamOffset,
sealAt
};
Expand Down
41 changes: 29 additions & 12 deletions packages/cubejs-ksql-driver/src/KsqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from '@cubejs-backend/shared';
import {
BaseDriver, DriverCapabilities,
DriverInterface, QueryOptions,
DriverInterface, TableColumn,
} from '@cubejs-backend/base-driver';
import { Kafka } from 'kafkajs';
import sqlstring, { format as formatSql } from 'sqlstring';
Expand Down Expand Up @@ -64,6 +64,12 @@ type KsqlDescribeResponse = {
}
};

type KsqlQueryOptions = {
outputColumnTypes?: TableColumn[],
streamOffset?: string,
selectStatement?: string,
};

/**
* KSQL driver class.
*/
Expand Down Expand Up @@ -161,7 +167,7 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
}
}

public async query<R = unknown>(query: string, values?: unknown[], options: { streamOffset?: string } = {}): Promise<R> {
public async query<R = unknown>(query: string, values?: unknown[], options: KsqlQueryOptions = {}): Promise<R> {
if (query.toLowerCase().startsWith('select')) {
throw new Error('Select queries for ksql allowed only from Cube Store. In order to query ksql create pre-aggregation first.');
}
Expand Down Expand Up @@ -261,13 +267,15 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
public loadPreAggregationIntoTable(preAggregationTableName: string, loadSql: string, params: any[], options: any): Promise<any> {
return this.query(loadSql.replace(preAggregationTableName, this.tableDashName(preAggregationTableName)), params, { streamOffset: options?.streamOffset });
public loadPreAggregationIntoTable(preAggregationTableName: string, loadSql: string, params: any[], options: KsqlQueryOptions): Promise<any> {
const { streamOffset } = options || {};
return this.query(loadSql.replace(preAggregationTableName, this.tableDashName(preAggregationTableName)), params, { streamOffset });
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
public async downloadTable(table: string, options: any): Promise<any> {
return this.getStreamingTableData(this.tableDashName(table), { streamOffset: options?.streamOffset });
const { streamOffset } = options || {};
return this.getStreamingTableData(this.tableDashName(table), { streamOffset });
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -278,11 +286,12 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
}

const selectStatement = sqlstring.format(query, params);
return this.getStreamingTableData(table, { selectStatement, streamOffset: options?.streamOffset });
const { streamOffset, outputColumnTypes } = options || {};
return this.getStreamingTableData(table, { selectStatement, streamOffset, outputColumnTypes });
}

private async getStreamingTableData(streamingTable: string, options: { selectStatement?: string, streamOffset?: string } = {}) {
const { selectStatement, streamOffset } = options;
private async getStreamingTableData(streamingTable: string, options: KsqlQueryOptions = {}) {
const { selectStatement, streamOffset, outputColumnTypes } = options;
const describe = await this.describeTable(streamingTable);
const name = this.config.streamingSourceName || 'default';
const kafkaDirectDownload = !!this.config.kafkaHost;
Expand All @@ -304,13 +313,20 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
url: this.config.url
}
};
const sourceTableTypes = await this.tableColumnTypes(streamingTable, describe);
streamingTable = kafkaDirectDownload ? describe.sourceDescription?.topic : streamingTable;

return {
types: await this.tableColumnTypes(streamingTable, describe),
types: outputColumnTypes || sourceTableTypes,
partitions: describe.sourceDescription?.partitions,
streamingTable: kafkaDirectDownload ? describe.sourceDescription?.topic : streamingTable,
streamingTable,
streamOffset,
selectStatement,
streamingSource
streamingSource,
sourceTable: outputColumnTypes ? {
types: sourceTableTypes,
tableName: streamingTable
} : null
};
}

Expand Down Expand Up @@ -344,7 +360,8 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {

public capabilities(): DriverCapabilities {
return {
streamingSource: true
streamingSource: true,
unloadWithoutTempTable: true,
};
}
}
6 changes: 5 additions & 1 deletion packages/cubejs-ksql-driver/src/KsqlQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export class KsqlQuery extends BaseQuery {
return `\`${name}\``;
}

public castToString(sql: string) {
return `CAST(${sql} as varchar(255))`;
}

public concatStringsSql(strings: string[]) {
return `CONCAT(${strings.join(', ')})`;
}
Expand Down Expand Up @@ -111,7 +115,7 @@ export class KsqlQuery extends BaseQuery {
}

public static extractTableFromSimpleSelectAsteriskQuery(sql: string) {
const match = sql.match(/^\s*select\s+\*\s+from\s+([a-zA-Z0-9_\-`".*]+)\s*/i);
const match = sql.replace(/\n/g, ' ').match(/^\s*select\s+.*\s+from\s+([a-zA-Z0-9_\-`".*]+)\s*/i);
return match && match[1];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ function getStructureVersion(preAggregation) {
if (preAggregation.streamOffset) {
versionArray.push(preAggregation.streamOffset);
}
if (preAggregation.outputColumnTypes) {
versionArray.push(preAggregation.outputColumnTypes);
}

return version(versionArray.length === 1 ? versionArray[0] : versionArray);
}
Expand Down Expand Up @@ -815,6 +818,9 @@ export class PreAggregationLoader {
if (this.preAggregation.streamOffset) {
versionArray.push(this.preAggregation.streamOffset);
}
if (this.preAggregation.outputColumnTypes) {
versionArray.push(this.preAggregation.outputColumnTypes);
}
versionArray.push(invalidationKeys);
return version(versionArray);
}
Expand Down Expand Up @@ -964,7 +970,11 @@ export class PreAggregationLoader {
targetTableName,
query,
params,
{ streamOffset: this.preAggregation.streamOffset, ...queryOptions }
{
streamOffset: this.preAggregation.streamOffset,
outputColumnTypes: this.preAggregation.outputColumnTypes,
...queryOptions
}
));

await this.createIndexes(client, newVersionEntry, saveCancelFn, queryOptions);
Expand Down Expand Up @@ -1107,7 +1117,11 @@ export class PreAggregationLoader {
targetTableName,
query,
params,
{ streamOffset: this.preAggregation.streamOffset, ...queryOptions }
{
streamOffset: this.preAggregation.streamOffset,
outputColumnTypes: this.preAggregation.outputColumnTypes,
...queryOptions
}
));

return queryOptions;
Expand Down Expand Up @@ -1156,6 +1170,7 @@ export class PreAggregationLoader {
sql,
params, {
streamOffset: this.preAggregation.streamOffset,
outputColumnTypes: this.preAggregation.outputColumnTypes,
...queryOptions,
...capabilities,
...this.getStreamingOptions(),
Expand Down Expand Up @@ -1261,7 +1276,11 @@ export class PreAggregationLoader {
tableData.rowStream = stream;
}
} else {
tableData = await saveCancelFn(client.downloadTable(table, { streamOffset: this.preAggregation.streamOffset, ...externalDriverCapabilities }));
tableData = await saveCancelFn(client.downloadTable(table, {
streamOffset: this.preAggregation.streamOffset,
outputColumnTypes: this.preAggregation.outputColumnTypes,
...externalDriverCapabilities
}));
}

if (!tableData.types) {
Expand Down
63 changes: 60 additions & 3 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2746,9 +2746,12 @@ export class BaseQuery {
}

newSubQueryForCube(cube, options) {
return this.options.queryFactory
? this.options.queryFactory.createQuery(cube, this.compilers, this.subQueryOptions(options))
: this.newSubQuery(options);
if (this.options.queryFactory) {
options.paramAllocator = null;
return this.options.queryFactory.createQuery(cube, this.compilers, this.subQueryOptions(options));
}

return this.newSubQuery(options);
}

subQueryOptions(options) {
Expand Down Expand Up @@ -2942,6 +2945,60 @@ export class BaseQuery {
);
}

preAggregationOutputColumnTypes(cube, preAggregation) {
return this.cacheValue(
['preAggregationOutputColumnTypes', cube, JSON.stringify(preAggregation)],
() => {
if (!preAggregation.outputColumnTypes) {
return null;
}

if (preAggregation.type === 'rollup') {
const query = this.preAggregations.rollupPreAggregationQuery(cube, preAggregation);

const evaluatedMapOutputColumnTypes = preAggregation.outputColumnTypes.reduce((acc, outputColumnType) => {
acc.set(outputColumnType.name, outputColumnType);
return acc;
}, new Map());

const findSchemaType = member => {
const outputSchemaType = evaluatedMapOutputColumnTypes.get(member);
if (!outputSchemaType) {
throw new UserError(`Output schema type for ${member} not found in pre-aggregation ${preAggregation}`);
}

return {
name: this.aliasName(member),
type: outputSchemaType.type,
};
};

// The order of the output columns is important, it should match the order in the select statement
const outputColumnTypes = [
...(query.dimensions || []).map(d => findSchemaType(d.dimension)),
...(query.timeDimensions || []).map(t => ({
name: `${this.aliasName(t.dimension)}_${t.granularity}`,
type: 'TIMESTAMP'
})),
...(query.measures || []).map(m => findSchemaType(m.measure)),
];

return outputColumnTypes;
}
throw new UserError('Output schema is only supported for rollup pre-aggregations');
},
{ inputProps: { }, cache: this.queryCache }
);
}

preAggregationUniqueKeyColumns(cube, preAggregation) {
if (preAggregation.uniqueKeyColumns) {
return preAggregation.uniqueKeyColumns.map(key => this.aliasName(`${cube}.${key}`));
}

return this.dimensionColumns();
}

preAggregationReadOnly(_cube, _preAggregation) {
return false;
}
Expand Down
21 changes: 13 additions & 8 deletions packages/cubejs-schema-compiler/src/adapter/PreAggregations.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import R from 'ramda';
import { FROM_PARTITION_RANGE, TO_PARTITION_RANGE } from '@cubejs-backend/shared';

import { UserError } from '../compiler/UserError';

Expand Down Expand Up @@ -189,7 +190,7 @@ export class PreAggregations {

const uniqueKeyColumnsDefault = () => null;
const uniqueKeyColumns = ({
rollup: () => queryForSqlEvaluation.dimensionColumns(),
rollup: () => queryForSqlEvaluation.preAggregationUniqueKeyColumns(cube, preAggregation),
originalSql: () => preAggregation.uniqueKeyColumns || null
}[preAggregation.type] || uniqueKeyColumnsDefault)();

Expand All @@ -209,6 +210,7 @@ export class PreAggregations {
preAggregationsSchema: queryForSqlEvaluation.preAggregationSchema(),
loadSql: queryForSqlEvaluation.preAggregationLoadSql(cube, preAggregation, tableName),
sql: queryForSqlEvaluation.preAggregationSql(cube, preAggregation),
outputColumnTypes: queryForSqlEvaluation.preAggregationOutputColumnTypes(cube, preAggregation),
uniqueKeyColumns,
aggregationsColumns,
dataSource: queryForSqlEvaluation.dataSource,
Expand All @@ -219,7 +221,7 @@ export class PreAggregations {
queryForSqlEvaluation.parseSecondDuration(preAggregation.refreshKey.updateWindow),
preAggregationStartEndQueries:
(preAggregation.partitionGranularity || references.timeDimensions[0]?.granularity) &&
this.refreshRangeQuery().preAggregationStartEndQueries(cube, preAggregation),
this.refreshRangeQuery(cube).preAggregationStartEndQueries(cube, preAggregation),
matchedTimeDimensionDateRange:
preAggregation.partitionGranularity && (
matchedTimeDimension && matchedTimeDimension.boundaryDateRangeFormatted() ||
Expand Down Expand Up @@ -1041,12 +1043,15 @@ export class PreAggregations {
return { preAggregations, result };
}

refreshRangeQuery() {
return this.query.newSubQuery({
rowLimit: null,
offset: null,
preAggregationQuery: true,
});
refreshRangeQuery(cube) {
return this.query.newSubQueryForCube(
cube,
{
rowLimit: null,
offset: null,
preAggregationQuery: true,
}
);
}

originalSqlPreAggregationQuery(cube, aggregation) {
Expand Down
6 changes: 6 additions & 0 deletions packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ export class CubeEvaluator extends CubeSymbols {
preAggregation.refreshRangeEnd = preAggregation.buildRangeEnd;
delete preAggregation.buildRangeEnd;
}

if (preAggregation.outputColumnTypes) {
preAggregation.outputColumnTypes.forEach(column => {
column.name = this.evaluateReferences(cube.name, column.member, { originalSorting: true });
});
}
}
}
}
Expand Down
Loading

0 comments on commit cdfbd1e

Please sign in to comment.