Skip to content

Commit

Permalink
Merge branch 'develop' into feat.snowpipe-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 22, 2024
2 parents 5b0fdbd + 27040b0 commit dc679b7
Show file tree
Hide file tree
Showing 24 changed files with 3,018 additions and 57,252 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

## [1.78.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.77.1...v1.78.0) (2024-09-16)


### Features

* add source id isolation for reverse etl ([#3496](https://github.com/rudderlabs/rudder-transformer/issues/3496)) ([b4f4dd1](https://github.com/rudderlabs/rudder-transformer/commit/b4f4dd1c43f8fd4b4f744413d79cfe8f5f77708b))
* add util for applying json string template ([#3699](https://github.com/rudderlabs/rudder-transformer/issues/3699)) ([b2f5654](https://github.com/rudderlabs/rudder-transformer/commit/b2f56540148066a40770e1506faadb4f3f5a296b))
* onboard X(Twiiter) Audience ([#3696](https://github.com/rudderlabs/rudder-transformer/issues/3696)) ([f77d2ab](https://github.com/rudderlabs/rudder-transformer/commit/f77d2ab4125a1a44bba95bebbee25bb4fac032da))
* webhook v2 path variables support ([#3705](https://github.com/rudderlabs/rudder-transformer/issues/3705)) ([f7783d8](https://github.com/rudderlabs/rudder-transformer/commit/f7783d8fb30093a847f450ee7ddd9449f272b112))


### Bug Fixes

* added support for window stabilization fix through envs ([#3720](https://github.com/rudderlabs/rudder-transformer/issues/3720)) ([8dcf1b3](https://github.com/rudderlabs/rudder-transformer/commit/8dcf1b3c5aff424d39bece8c9912ec4a1eb221ea))
* circular json bugsnag ([#3713](https://github.com/rudderlabs/rudder-transformer/issues/3713)) ([263d075](https://github.com/rudderlabs/rudder-transformer/commit/263d0758be828402068278c7c5356b65119e7e9a))
* error messages in gaec ([#3702](https://github.com/rudderlabs/rudder-transformer/issues/3702)) ([441fb57](https://github.com/rudderlabs/rudder-transformer/commit/441fb57a537592cc1cc45dc8f31fa8be98e18320))
* gaoc bugsnag alert for email.trim ([#3693](https://github.com/rudderlabs/rudder-transformer/issues/3693)) ([6ba16f6](https://github.com/rudderlabs/rudder-transformer/commit/6ba16f6e5d9bb0de773ebcb8be13a78af325a4a1))

### [1.77.1](https://github.com/rudderlabs/rudder-transformer/compare/v1.77.0...v1.77.1) (2024-09-05)


Expand Down
6,119 changes: 1,644 additions & 4,475 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rudder-transformer",
"version": "1.77.1",
"version": "1.78.0",
"description": "",
"homepage": "https://github.com/rudderlabs/rudder-transformer#readme",
"bugs": {
Expand Down Expand Up @@ -131,6 +131,7 @@
"@types/koa-bodyparser": "^4.3.10",
"@types/lodash": "^4.14.197",
"@types/node": "^20.2.5",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^5.61.0",
"@typescript-eslint/parser": "^5.59.2",
"axios-mock-adapter": "^1.22.0",
Expand Down
11 changes: 11 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ type Destination = {
IsConnectionEnabled?: boolean;
};

type Connection = {
sourceId: string;
destinationId: string;
enabled: boolean;
config: Record<string, unknown>;
processorEnabled?: boolean;
};

type UserTransformationLibrary = {
VersionID: string;
};
Expand All @@ -151,6 +159,7 @@ type ProcessorTransformationRequest = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
libraries?: UserTransformationLibrary[];
credentials?: Credential[];
};
Expand All @@ -160,6 +169,7 @@ type RouterTransformationRequestData = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
};

type RouterTransformationRequest = {
Expand Down Expand Up @@ -350,6 +360,7 @@ export {
DeliveryJobState,
DeliveryV0Response,
DeliveryV1Response,
Connection,
Destination,
ErrorDetailer,
MessageIdMetadataMap,
Expand Down
6 changes: 4 additions & 2 deletions src/v0/destinations/fb_custom_audience/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const subTypeFields = [
'CONTACT_IMPORTER',
'DATA_FILE',
];
// as per real time experimentation maximum 500 users can be added at a time
// const MAX_USER_COUNT = 500; (using from destination definition)

const USER_ADD = 'add';
const USER_DELETE = 'remove';
// https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences/
const MAX_USER_COUNT = 10000;
/* No official Documentation is available for this but using trial
and error method we found that 65000 bytes is the maximum payload allowed size but we are 60000 just to be sure batching is done properly
*/
Expand All @@ -102,6 +103,7 @@ module.exports = {
schemaFields,
USER_ADD,
USER_DELETE,
MAX_USER_COUNT,
typeFields,
subTypeFields,
maxPayloadSize,
Expand Down
140 changes: 90 additions & 50 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const { schemaFields, MAX_USER_COUNT } = require('./config');
const stats = require('../../../util/stats');
const {
getDestinationExternalIDInfoForRetl,
isDefinedAndNotNullAndNotEmpty,
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
isEventSentByVDMV2Flow,
isEventSentByVDMV1Flow,
} = require('../../util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const {
Expand All @@ -20,6 +20,7 @@ const {
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
generateAppSecretProof,
} = require('./util');

const processRecordEventArray = (
Expand All @@ -31,7 +32,7 @@ const processRecordEventArray = (
prepareParams,
destination,
operation,
operationAudienceId,
audienceId,
) => {
const toSendEvents = [];
const metadata = [];
Expand Down Expand Up @@ -88,7 +89,7 @@ const processRecordEventArray = (
operationCategory: operation,
};

const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId);
const builtResponse = responseBuilderSimple(wrappedResponse, audienceId);

toSendEvents.push(builtResponse);
});
Expand All @@ -99,49 +100,26 @@ const processRecordEventArray = (
return response;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
function preparePayload(events, config) {
const { audienceId, userSchema, isRaw, type, subType, isHashRequired, disableFormat } = config;
const { destination } = events[0];
const { accessToken, appSecret } = destination.Config;
const prepareParams = {
access_token: accessToken,
};

// maxUserCount validation
const maxUserCountNumber = parseInt(maxUserCount, 10);
if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');
if (isDefinedAndNotNullAndNotEmpty(appSecret)) {
const dateNow = Date.now();
prepareParams.appsecret_time = Math.floor(dateNow / 1000); // Get current Unix time in seconds
prepareParams.appsecret_proof = generateAppSecretProof(accessToken, appSecret, dateNow);
}

// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (mappedToDestination) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
const cleanUserSchema = userSchema.map((field) => field.trim());

// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];
if (!isDefinedAndNotNullAndNotEmpty(audienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
if (!checkSubsetOfArray(schemaFields, userSchema)) {
if (!checkSubsetOfArray(schemaFields, cleanUserSchema)) {
throw new ConfigurationError('One or more of the schema fields are not supported');
}

Expand All @@ -156,7 +134,7 @@ async function processRecordInputs(groupedRecordInputs) {
paramsPayload.data_source = dataSource;
}

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
const groupedRecordsByAction = lodash.groupBy(events, (record) =>
record.message.action?.toLowerCase(),
);

Expand All @@ -167,55 +145,55 @@ async function processRecordInputs(groupedRecordInputs) {
if (groupedRecordsByAction.delete) {
const deleteRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.delete,
maxUserCountNumber,
MAX_USER_COUNT,
);
deleteResponse = processRecordEventArray(
deleteRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'remove',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.insert) {
const insertRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.insert,
maxUserCountNumber,
MAX_USER_COUNT,
);

insertResponse = processRecordEventArray(
insertRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.update) {
const updateRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.update,
maxUserCountNumber,
MAX_USER_COUNT,
);
updateResponse = processRecordEventArray(
updateRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

Expand All @@ -225,6 +203,7 @@ async function processRecordInputs(groupedRecordInputs) {
deleteResponse,
insertResponse,
updateResponse,

errorResponse,
);
if (finalResponse.length === 0) {
Expand All @@ -235,6 +214,67 @@ async function processRecordInputs(groupedRecordInputs) {
return finalResponse;
}

function processRecordInputsV1(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId, userSchema } =
destination.Config;

let operationAudienceId = audienceId;
let updatedUserSchema = userSchema;
if (isEventSentByVDMV1Flow(groupedRecordInputs[0])) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
updatedUserSchema = getSchemaForEventMappedToDest(message);
}

return preparePayload(groupedRecordInputs, {
audienceId: operationAudienceId,
userSchema: updatedUserSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
}

const processRecordInputsV2 = (groupedRecordInputs) => {
const { connection, message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId } =
connection.config.destination;
// Ref: https://www.notion.so/rudderstacks/VDM-V2-Final-Config-and-Record-EventPayload-8cc80f3d88ad46c7bc43df4b87a0bbff
const identifiers = message?.identifiers;
let userSchema;
if (identifiers) {
userSchema = Object.keys(identifiers);
}
const events = groupedRecordInputs.map((record) => ({
...record,
message: {
...record.message,
fields: record.message.identifiers,
},
}));
return preparePayload(events, {
audienceId,
userSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
};

function processRecordInputs(groupedRecordInputs) {
const event = groupedRecordInputs[0];
if (isEventSentByVDMV2Flow(event)) {
return processRecordInputsV2(groupedRecordInputs);
}
return processRecordInputsV1(groupedRecordInputs);
}

module.exports = {
processRecordInputs,
};
Loading

0 comments on commit dc679b7

Please sign in to comment.