Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for vdm next to fb custom audiences #3729

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6,115 changes: 1,642 additions & 4,473 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
"@types/koa-bodyparser": "^4.3.10",
"@types/lodash": "^4.14.197",
"@types/node": "^20.2.5",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^5.61.0",
"@typescript-eslint/parser": "^5.59.2",
"axios-mock-adapter": "^1.22.0",
Expand Down
11 changes: 11 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ type Destination = {
IsConnectionEnabled?: boolean;
};

type Connection = {
sourceId: string;
destinationId: string;
enabled: boolean;
config: Record<string, unknown>;
processorEnabled?: boolean;
koladilip marked this conversation as resolved.
Show resolved Hide resolved
};

type UserTransformationLibrary = {
VersionID: string;
};
Expand All @@ -151,6 +159,7 @@ type ProcessorTransformationRequest = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
libraries?: UserTransformationLibrary[];
credentials?: Credential[];
};
Expand All @@ -160,6 +169,7 @@ type RouterTransformationRequestData = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
};

type RouterTransformationRequest = {
Expand Down Expand Up @@ -350,6 +360,7 @@ export {
DeliveryJobState,
DeliveryV0Response,
DeliveryV1Response,
Connection,
Destination,
ErrorDetailer,
MessageIdMetadataMap,
Expand Down
6 changes: 4 additions & 2 deletions src/v0/destinations/fb_custom_audience/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const subTypeFields = [
'CONTACT_IMPORTER',
'DATA_FILE',
];
// as per real time experimentation maximum 500 users can be added at a time
// const MAX_USER_COUNT = 500; (using from destination definition)

const USER_ADD = 'add';
const USER_DELETE = 'remove';
// https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences/
const MAX_USER_COUNT = 10000;
/* No official Documentation is available for this but using trial
and error method we found that 65000 bytes is the maximum payload allowed size but we are 60000 just to be sure batching is done properly
*/
Expand All @@ -102,6 +103,7 @@ module.exports = {
schemaFields,
USER_ADD,
USER_DELETE,
MAX_USER_COUNT,
typeFields,
subTypeFields,
maxPayloadSize,
Expand Down
140 changes: 90 additions & 50 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const { schemaFields, MAX_USER_COUNT } = require('./config');
const stats = require('../../../util/stats');
const {
getDestinationExternalIDInfoForRetl,
isDefinedAndNotNullAndNotEmpty,
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
isEventSentByVDMV2Flow,
isEventSentByVDMV1Flow,
} = require('../../util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const {
Expand All @@ -20,6 +20,7 @@ const {
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
generateAppSecretProof,
} = require('./util');

const processRecordEventArray = (
Expand All @@ -31,7 +32,7 @@ const processRecordEventArray = (
prepareParams,
destination,
operation,
operationAudienceId,
audienceId,
) => {
const toSendEvents = [];
const metadata = [];
Expand Down Expand Up @@ -88,7 +89,7 @@ const processRecordEventArray = (
operationCategory: operation,
};

const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId);
const builtResponse = responseBuilderSimple(wrappedResponse, audienceId);

toSendEvents.push(builtResponse);
});
Expand All @@ -99,49 +100,26 @@ const processRecordEventArray = (
return response;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
function preparePayload(events, config) {
const { audienceId, userSchema, isRaw, type, subType, isHashRequired, disableFormat } = config;
const { destination } = events[0];
const { accessToken, appSecret } = destination.Config;
const prepareParams = {
access_token: accessToken,
};

// maxUserCount validation
const maxUserCountNumber = parseInt(maxUserCount, 10);
if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');
if (isDefinedAndNotNullAndNotEmpty(appSecret)) {
const dateNow = Date.now();
prepareParams.appsecret_time = Math.floor(dateNow / 1000); // Get current Unix time in seconds
prepareParams.appsecret_proof = generateAppSecretProof(accessToken, appSecret, dateNow);
}

// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (mappedToDestination) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
const cleanUserSchema = userSchema.map((field) => field.trim());

// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];
if (!isDefinedAndNotNullAndNotEmpty(audienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
if (!checkSubsetOfArray(schemaFields, userSchema)) {
if (!checkSubsetOfArray(schemaFields, cleanUserSchema)) {
throw new ConfigurationError('One or more of the schema fields are not supported');
}

Expand All @@ -156,7 +134,7 @@ async function processRecordInputs(groupedRecordInputs) {
paramsPayload.data_source = dataSource;
}

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
const groupedRecordsByAction = lodash.groupBy(events, (record) =>
record.message.action?.toLowerCase(),
);

Expand All @@ -167,55 +145,55 @@ async function processRecordInputs(groupedRecordInputs) {
if (groupedRecordsByAction.delete) {
const deleteRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.delete,
maxUserCountNumber,
MAX_USER_COUNT,
);
deleteResponse = processRecordEventArray(
deleteRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'remove',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.insert) {
const insertRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.insert,
maxUserCountNumber,
MAX_USER_COUNT,
);

insertResponse = processRecordEventArray(
insertRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.update) {
const updateRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.update,
maxUserCountNumber,
MAX_USER_COUNT,
);
updateResponse = processRecordEventArray(
updateRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

Expand All @@ -225,6 +203,7 @@ async function processRecordInputs(groupedRecordInputs) {
deleteResponse,
insertResponse,
updateResponse,

errorResponse,
);
if (finalResponse.length === 0) {
Expand All @@ -235,6 +214,67 @@ async function processRecordInputs(groupedRecordInputs) {
return finalResponse;
}

function processRecordInputsV1(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId, userSchema } =
destination.Config;

let operationAudienceId = audienceId;
let updatedUserSchema = userSchema;
if (isEventSentByVDMV1Flow(groupedRecordInputs[0])) {
koladilip marked this conversation as resolved.
Show resolved Hide resolved
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
updatedUserSchema = getSchemaForEventMappedToDest(message);
}

return preparePayload(groupedRecordInputs, {
audienceId: operationAudienceId,
userSchema: updatedUserSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
}

const processRecordInputsV2 = (groupedRecordInputs) => {
const { connection, message } = groupedRecordInputs[0];
koladilip marked this conversation as resolved.
Show resolved Hide resolved
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId } =
connection.config.destination;
// Ref: https://www.notion.so/rudderstacks/VDM-V2-Final-Config-and-Record-EventPayload-8cc80f3d88ad46c7bc43df4b87a0bbff
const identifiers = message?.identifiers;
let userSchema;
if (identifiers) {
userSchema = Object.keys(identifiers);
krishna2020 marked this conversation as resolved.
Show resolved Hide resolved
}
const events = groupedRecordInputs.map((record) => ({
...record,
message: {
...record.message,
fields: record.message.identifiers,
},
}));
return preparePayload(events, {
audienceId,
userSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
};

function processRecordInputs(groupedRecordInputs) {
const event = groupedRecordInputs[0];
if (isEventSentByVDMV2Flow(event)) {
koladilip marked this conversation as resolved.
Show resolved Hide resolved
return processRecordInputsV2(groupedRecordInputs);
}
return processRecordInputsV1(groupedRecordInputs);
}

module.exports = {
processRecordInputs,
};
Loading
Loading