From 9a9aec6b1f6aa26e03a73f046f5a3268a4f922da Mon Sep 17 00:00:00 2001 From: David Durman Date: Fri, 4 Oct 2024 16:25:09 +0200 Subject: [PATCH] facebookbusiness (major update) --- src/appmixer/facebookbusiness/bundle.json | 13 +- .../AddMemberToAudience.js | 44 +++- .../AddMemberToAudience/component.json | 46 ++-- .../AddMembersFromCSVToAudience.js | 219 +++++++++++----- .../component.json | 65 ++++- .../RemoveMemberFromAudience.js | 67 +++++ .../RemoveMemberFromAudience/component.json | 190 ++++++++++++++ .../RemoveMembersInCSVFromAudience.js | 186 ++++++++++++++ .../component.json | 131 ++++++++++ .../ReplaceMembersFromCSVInAudience.js | 235 ++++++++++++++++++ .../component.json | 135 ++++++++++ 11 files changed, 1232 insertions(+), 99 deletions(-) create mode 100644 src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/RemoveMemberFromAudience.js create mode 100644 src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/component.json create mode 100644 src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/RemoveMembersInCSVFromAudience.js create mode 100644 src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/component.json create mode 100644 src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/ReplaceMembersFromCSVInAudience.js create mode 100644 src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/component.json diff --git a/src/appmixer/facebookbusiness/bundle.json b/src/appmixer/facebookbusiness/bundle.json index a95b06b7d..4a5e52633 100644 --- a/src/appmixer/facebookbusiness/bundle.json +++ b/src/appmixer/facebookbusiness/bundle.json @@ -1,7 +1,12 @@ { "name": "appmixer.facebookbusiness", - "version": "1.0.0", - "changelog": [ - "Initial version" - ] + "version": "2.0.0", + "changelog": { + "1.0.0": ["Initial version"], + "2.0.0": [ + "Support for sessions to upload large number of users.", + "New: RemoveMembersInCSVFromAudience, ReplaceMembersFromCSVInAudience", + "Changed: schema configuration for AddMembersFromCSVToAudience." + ] + } } diff --git a/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/AddMemberToAudience.js b/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/AddMemberToAudience.js index 4c0bb0492..607d92c06 100644 --- a/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/AddMemberToAudience.js +++ b/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/AddMemberToAudience.js @@ -1,3 +1,5 @@ +const { createHash } = require('crypto'); + module.exports = { async receive(context) { @@ -19,27 +21,47 @@ module.exports = { 'ST', 'ZIP', 'COUNTRY', - 'MADID' + 'MADID', + 'EXTERN_ID' ]; - const member = {}; - const schema = {}; + const member = []; + const schema = []; fields.forEach(field => { const value = context.messages.in.content[field]; if (value) { - member[field] = value; - schema[field] = field; + const normalizedValue = createHash('sha256').update(value).digest('hex'); + member.push(normalizedValue); + schema.push(field); } }); - const payload = { - schema: schema, - data: [member], + const body = { + payload: { + schema: schema, + data: [member] + }, access_token: accessToken }; - const url = `https://graph.facebook.com/v20.0/${audienceId}?access_token=${accessToken}`; - const response = await context.httpRequest.post(url, payload); - return context.sendJson({ accountId, audienceId, ...response.data }, 'out'); + const url = `https://graph.facebook.com/v20.0/${audienceId}/users`; + const response = await context.httpRequest.post(url, body); + + if (!response || !response.data || response.data.num_received !== 1) { + throw new Error(`Failed to add member to audience. Response: ${JSON.stringify({ + status: response.status, + statusText: response.statusText, + headers: response.headers, + data: response.data, + requestBody: body + })}`); + } + return context.sendJson({ + account_id: accountId, + audience_id: audienceId, + num_received: response.data.num_received, + num_invalid_entries: response.data.num_invalid_entries, + session_id: response.data.session_id + }, 'out'); } }; diff --git a/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/component.json b/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/component.json index dc2b80076..b37b190f0 100644 --- a/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/component.json +++ b/src/appmixer/facebookbusiness/marketing/AddMemberToAudience/component.json @@ -37,7 +37,8 @@ "ST": { "type": "string" }, "ZIP": { "type": "string" }, "COUNTRY": { "type": "string" }, - "MADID": { "type": "string" } + "MADID": { "type": "string" }, + "EXTERN_ID": { "type": "string" } }, "required": ["audienceId"] }, @@ -73,17 +74,21 @@ "EMAIL": { "type": "text", "index": 3, - "label": "Email" + "label": "Email address" }, "PHONE": { "type": "text", "index": 4, - "label": "Phone" + "label": "Phone number" }, "GEN": { - "type": "text", + "type": "select", "index": 5, - "label": "Gender" + "label": "Gender", + "options": [ + { "value": "f", "label": "f" }, + { "value": "m", "label": "m" } + ] }, "DOBY": { "type": "text", @@ -146,6 +151,12 @@ "index": 12, "label": "Mobile Advertiser ID", "tooltip": "Use all lowercase, and keep hyphens." + }, + "EXTERN_ID": { + "type": "text", + "index": 13, + "label": "External ID", + "tooltip": "This can be any unique ID from the advertiser, such as loyalty membership IDs, user IDs, and external cookie IDs." } } } @@ -154,18 +165,23 @@ "outPorts": [ { "name": "out", - "options": [ - { - "value": "accountId", - "label": "Account ID", - "schema": { "type": "string" } - }, - { - "value": "audienceId", + "options": [{ + "value": "audience_id", "label": "Audience ID", "schema": { "type": "string" } - } - ] + }, { + "value": "session_id", + "label": "Session ID", + "schema": { "type": "string" } + }, { + "value": "num_received", + "label": "Number of Received Users", + "schema": { "type": "integer" } + }, { + "value": "num_invalid_entries", + "label": "Number of Invalid Entries", + "schema": { "type": "integer" } + }] } ], "icon": "" diff --git a/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/AddMembersFromCSVToAudience.js b/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/AddMembersFromCSVToAudience.js index b86a1975e..decc644cc 100644 --- a/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/AddMembersFromCSVToAudience.js +++ b/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/AddMembersFromCSVToAudience.js @@ -1,79 +1,186 @@ +const { randomInt, createHash } = require('crypto'); const csv = require('csv-parser'); -// Default schema config. This will be used if schema is not provided in the input. -let schemaConfig = { - 'email': 'EMAIL', - 'phone': 'PHONE_NUMBER', - 'first_name': 'FN', - 'last_name': 'LN' -}; +const BATCH_SIZE = 10000; module.exports = { async receive(context) { const { accountId, audienceId, fileId, schema } = context.messages.in.content; - const accessToken = context.auth.accessToken; + + const schemaConfig = {}; if (schema) { - schemaConfig = {}; - schema.split(',').forEach(value => { - const item = value.trim().split(':'); - const header = item[0].trim(); - const schemaKey = item[1].trim(); - schemaConfig[header] = schemaKey; + schema.ADD.forEach(item => { + schemaConfig[item.csvHeader.toLowerCase().trim()] = item.fbType; }); } - const results = []; - const fileStream = await context.getFileReadStream(fileId); - return new Promise((resolve, reject) => { - fileStream - .pipe(csv()) - .on('data', (data) => results.push(data)) - .on('error', (err) => reject(err)) - .on('end', async () => { - const { schema, data } = detectSchemaAndPrepareData(results); - - // Process in batches (https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences). - const batchSize = 5000; - for (let i = 0; i < data.length; i += batchSize) { - const batch = data.slice(i, i + batchSize); - const payload = { - schema: schema, - data: batch, - access_token: accessToken - }; - - const url = `https://graph.facebook.com/v20.0/${audienceId}?access_token=${accessToken}`; - try { - const response = await context.httpRequest.post(url, payload); - resolve(context.sendJson({ accountId, audienceId, ...response.data }, 'out')); - } catch (error) { - reject(error); - } + const fileInfo = await context.getFileInfo(fileId); + + const reader = fileStream.pipe(csv({ + mapHeaders: ({ header }) => header.toLowerCase().trim() + })); + + // randomInt limit is (max - min) < 2^48. See https://nodejs.org/api/crypto.html#cryptorandomintmin-max-callback. + const sessionId = randomInt(0, 2**48 - 1); + let batch = []; + let batchIndex = 0; + let estimatedMembersCount = 0; + let membersCount = 0; + + let numInvalidEntries = 0; + let invalidEntrySamples = []; + + // Process rows in a way so that we can detect the last row (and therefore last batch). + let previousRow = null; + + for await (const row of reader) { + membersCount += 1; + + if (previousRow !== null) { + batch.push(previousRow); + if (batch.length >= BATCH_SIZE) { + + if (batchIndex === 0) { + // First batch. Estimate the total number of members. + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); } - }); - }); + + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + false, + estimatedMembersCount + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + batchIndex += 1; + batch = []; // Clear the batch + } + } + previousRow = row; + } + + if (previousRow !== null) { + batch.push(previousRow); + if (!estimatedMembersCount) { + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); + } + // Now process the last batch after loop ends. + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + true, + estimatedMembersCount + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + } + + return context.sendJson({ + account_id: accountId, + audience_id: audienceId, + num_invalid_entries: numInvalidEntries, + invalid_entry_samples: invalidEntrySamples, + num_total_entries: membersCount + }, 'out'); } }; +function getRandomUniqueElements(array, numElements) { -// Detect schema and prepare data format -function detectSchemaAndPrepareData(users) { - let detectedSchema = []; - const data = users.map(user => { - let userData = []; - for (const key in user) { - if (schemaConfig[key]) { - userData.push(user[key]); - if (!detectedSchema.includes(schemaConfig[key])) { - detectedSchema.push(schemaConfig[key]); - } + if (numElements >= array.length) { + return array; + } + + const selectedIndices = new Set(); + const result = []; + + while (result.length < numElements) { + const index = Math.floor(Math.random() * array.length); + if (!selectedIndices.has(index)) { + selectedIndices.add(index); + result.push(array[index]); + } + } + + return result; +} + +function estimateNumberOfRows(fileSize, batch) { + + // Estimate the number of rows in the file. + // This is a very rough estimate and may not be accurate. + // We don't want to loop over the entire batch rows. Instead, + // we will randomly select a limited number of rows and calculate + // the avarage row size in Bytes, then divide the total file size by this number. + + const randomRows = getRandomUniqueElements(batch, 20); + let randomRowsSize = 0; + for (const row of randomRows) { + randomRowsSize += Buffer.from(Object.values(row).join('')).length; + } + const averageRowSize = randomRowsSize / randomRows.length; + return Math.floor(fileSize / averageRowSize) + 1; +} + +async function sendBatchToFacebook(context, audienceId, schemaConfig, sessionId, batch, batchIndex, isLastBatch, estimatedMembersCount) { + + const body = { + payload: { + schema: detectSchema(batch, schemaConfig), + data: prepareMembers(batch, schemaConfig) + }, + session: { + session_id: sessionId, + batch_seq: batchIndex + 1, + last_batch_flag: isLastBatch, + estimated_num_total: estimatedMembersCount + }, + access_token: context.auth.accessToken + }; + + const url = `https://graph.facebook.com/v20.0/${audienceId}/users`; + + await context.log({ step: 'batch', schema: body.payload.schema, session: body.session, size: batch.length }); + + return context.httpRequest.post(url, body); +} + +function detectSchema(batch, schemaConfig) { + + let schema = []; + for (const csvHeader in batch[0]) { + if (schemaConfig[csvHeader]) { + schema.push(schemaConfig[csvHeader]); + } + } + return schema; +} + +function prepareMembers(batch, schemaConfig) { + + return batch.map(member => { + const memberData = []; + for (const column in member) { + if (schemaConfig[column]) { + const value = createHash('sha256').update(member[column]).digest('hex'); + memberData.push(value); } } - return userData; + return memberData; }); - return { schema: detectedSchema, data }; } diff --git a/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/component.json b/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/component.json index d5f943bfe..089825ddc 100644 --- a/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/component.json +++ b/src/appmixer/facebookbusiness/marketing/AddMembersFromCSVToAudience/component.json @@ -25,7 +25,7 @@ "accountId": { "type": "string" }, "audienceId": { "type": "string" }, "fileId": { "type": "string" }, - "schema": { "type": "string" } + "schema": { "type": "object" } }, "required": ["audienceId", "fileId"] }, @@ -65,10 +65,40 @@ "tooltip": "Select a file or use a File ID returned from a previous step." }, "schema": { - "type": "text", + "type": "expression", + "levels": ["ADD"], "index": 3, "label": "Schema", - "tooltip": "Specify what type of information you will provide in the CSV file. A comma separated list of the following values: EMAIL, PHONE, GEN, DOBY, DOBM, DOBD, LN, FN, FI, CT, ST, ZIP, COUNTRY, MADID prefixed by a colon and the name of the column in the CSV file. Example: email:EMAIL,phone:PHONE." + "tooltip": "Specify what type of information you will provide in the CSV file. The order of the columns is not important.", + "fields": { + "csvHeader": { + "type": "text", + "label": "CSV Header", + "tooltip": "The name of the column in the CSV file. The column name is case insensitive." + }, + "fbType": { + "type": "select", + "label": "Facebook Type", + "tooltip": "The type of information in the column.", + "options": [ + { "value": "EMAIL", "label": "EMAIL (email address)" }, + { "value": "PHONE", "label": "PHONE (phone number)" }, + { "value": "GEN", "label": "GEN (gender - m|f)" }, + { "value": "DOBY", "label": "DOBY (birth year - YYYY)" }, + { "value": "DOBM", "label": "DOBM (birth month - MM - 01 to 12)" }, + { "value": "DOBD", "label": "DOBD (birthday - DD - 01 to 31)" }, + { "value": "LN", "label": "LN (last name)" }, + { "value": "FN", "label": "FN (first name)" }, + { "value": "FI", "label": "FI (first name initial)" }, + { "value": "CT", "label": "CT (city)" }, + { "value": "ST", "label": "ST (state - US 2-letter, Others lowercase)" }, + { "value": "ZIP", "label": "ZIP (zip code)" }, + { "value": "COUNTRY", "label": "COUNTRY (country code - 2-letter)" }, + { "value": "MADID", "label": "MADID (mobile advertiser ID)" }, + { "value": "EXTERN_ID", "label": "EXTERN_ID (your own ID)" } + ] + } + } } } } @@ -77,18 +107,27 @@ "outPorts": [ { "name": "out", - "options": [ - { - "value": "accountId", - "label": "Account ID", - "schema": { "type": "string" } - }, - { - "value": "audienceId", + "options": [{ + "value": "audience_id", "label": "Audience ID", "schema": { "type": "string" } - } - ] + }, { + "value": "account_id", + "label": "Account ID", + "schema": { "type": "string" } + }, { + "value": "invalid_entry_samples", + "label": "Invalid Entry Samples", + "schema": { "type": "array" } + }, { + "value": "num_invalid_entries", + "label": "Number of Invalid Entries", + "schema": { "type": "integer" } + }, { + "value": "num_total_entries", + "label": "Number of Entries", + "schema": { "type": "integer" } + }] } ], "icon": "" diff --git a/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/RemoveMemberFromAudience.js b/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/RemoveMemberFromAudience.js new file mode 100644 index 000000000..81d816374 --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/RemoveMemberFromAudience.js @@ -0,0 +1,67 @@ +const { createHash } = require('crypto'); + +module.exports = { + + async receive(context) { + + const { accountId, audienceId } = context.messages.in.content; + const accessToken = context.auth.accessToken; + + const fields = [ + 'EMAIL', + 'PHONE', + 'GEN', + 'DOBY', + 'DOBM', + 'DOBD', + 'FN', + 'LN', + 'FI', + 'CT', + 'ST', + 'ZIP', + 'COUNTRY', + 'MADID', + 'EXTERN_ID' + ]; + + const member = []; + const schema = []; + fields.forEach(field => { + const value = context.messages.in.content[field]; + if (value) { + const normalizedValue = createHash('sha256').update(value).digest('hex'); + member.push(normalizedValue); + schema.push(field); + } + }); + + const body = { + payload: { + schema: schema, + data: [member] + }, + access_token: accessToken + }; + + const url = `https://graph.facebook.com/v20.0/${audienceId}/users`; + const response = await context.httpRequest.delete(url, { data: body }); + + if (!response || !response.data || response.data.num_received !== 1) { + throw new Error(`Failed to add member to audience. Response: ${JSON.stringify({ + status: response.status, + statusText: response.statusText, + headers: response.headers, + data: response.data, + requestBody: body + })}`); + } + return context.sendJson({ + account_id: accountId, + audience_id: audienceId, + num_received: response.data.num_received, + num_invalid_entries: response.data.num_invalid_entries, + session_id: response.data.session_id + }, 'out'); + } +}; diff --git a/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/component.json b/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/component.json new file mode 100644 index 000000000..dc7e29b06 --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/RemoveMemberFromAudience/component.json @@ -0,0 +1,190 @@ +{ + "name": "appmixer.facebookbusiness.marketing.RemoveMemberFromAudience", + "author": "Appmixer ", + "description": "Remove a single customer from a custom audience.", + "auth": { + "service": "appmixer:facebookbusiness", + "scope": [ + "ads_management", + "ads_read" + ] + }, + "quota": { + "manager": "appmixer:facebookbusiness", + "resources": "requests", + "scope": { + "userId": "{{userId}}" + } + }, + "inPorts": [ + { + "name": "in", + "schema": { + "type": "object", + "properties": { + "accountId": { "type": "string" }, + "audienceId": { "type": "string" }, + "EMAIL": { "type": "string" }, + "PHONE": { "type": "string" }, + "GEN": { "type": "string" }, + "DOBY": { "type": "string" }, + "DOBM": { "type": "string" }, + "DOBD": { "type": "string" }, + "FN": { "type": "string" }, + "LN": { "type": "string" }, + "FI": { "type": "string" }, + "CT": { "type": "string" }, + "ST": { "type": "string" }, + "ZIP": { "type": "string" }, + "COUNTRY": { "type": "string" }, + "MADID": { "type": "string" }, + "EXTERN_ID": { "type": "string" } + }, + "required": ["audienceId"] + }, + "inspector": { + "inputs": { + "accountId": { + "type": "select", + "label": "Account ID", + "index": 1, + "tooltip": "Enter your Ad Account ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetAdAccounts?outPort=out", + "data": { + "transform": "./GetAdAccounts#toSelectArray" + } + } + }, + "audienceId": { + "type": "select", + "label": "Custom Audience ID", + "index": 2, + "tooltip": "Enter your Custom Audience ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetCustomAudiences?outPort=out", + "data": { + "transform": "./GetCustomAudiences#toSelectArray", + "messages": { + "in/accountId": "inputs/in/accountId" + } + } + } + }, + "EMAIL": { + "type": "text", + "index": 3, + "label": "Email address" + }, + "PHONE": { + "type": "text", + "index": 4, + "label": "Phone number" + }, + "GEN": { + "type": "select", + "index": 5, + "label": "Gender", + "options": [ + { "value": "f", "label": "f" }, + { "value": "m", "label": "m" } + ] + }, + "DOBY": { + "type": "text", + "index": 6, + "label": "Birth year", + "tooltip": "Use the YYYY format: 1900 to the current year." + }, + "DOBM": { + "type": "text", + "index": 7, + "label": "Birth month", + "tooltip": "Use the MM format: 01 to 12." + }, + "DOBD": { + "type": "text", + "index": 8, + "label": "Birth day", + "tooltip": "Use the DD format: 01 to 31." + }, + "FN": { + "type": "text", + "index": 9, + "label": "First Name" + }, + "LN": { + "type": "text", + "index": 10, + "label": "Last Name" + }, + "FI": { + "type": "text", + "index": 11, + "label": "First Name Initial" + }, + "CT": { + "type": "text", + "index": 12, + "label": "City" + }, + "ST": { + "type": "text", + "index": 12, + "label": "US State", + "tooltip": "Use the 2-character ANSI abbreviation code, lowercase." + }, + "ZIP": { + "type": "text", + "index": 12, + "label": "ZIP Code", + "tooltip": "Use lowercase, and no white space. For the US, use only the first 5 digits. For the UK, use the Area/District/Sector format." + }, + "COUNTRY": { + "type": "text", + "index": 12, + "label": "Country", + "tooltip": "Use lowercase, 2-letter country codes in ISO 3166-1 alpha-2." + }, + "MADID": { + "type": "text", + "index": 12, + "label": "Mobile Advertiser ID", + "tooltip": "Use all lowercase, and keep hyphens." + }, + "EXTERN_ID": { + "type": "text", + "index": 13, + "label": "External ID", + "tooltip": "This can be any unique ID from the advertiser, such as loyalty membership IDs, user IDs, and external cookie IDs." + } + } + } + } + ], + "outPorts": [ + { + "name": "out", + "options": [{ + "value": "audience_id", + "label": "Audience ID", + "schema": { "type": "string" } + }, { + "value": "session_id", + "label": "Session ID", + "schema": { "type": "string" } + }, { + "value": "num_received", + "label": "Number of Received Users", + "schema": { "type": "integer" } + }, { + "value": "num_invalid_entries", + "label": "Number of Invalid Entries", + "schema": { "type": "integer" } + }] + } + ], + "icon": "" +} + + diff --git a/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/RemoveMembersInCSVFromAudience.js b/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/RemoveMembersInCSVFromAudience.js new file mode 100644 index 000000000..a1343498c --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/RemoveMembersInCSVFromAudience.js @@ -0,0 +1,186 @@ +const { randomInt, createHash } = require('crypto'); +const csv = require('csv-parser'); + +const BATCH_SIZE = 10000; + +module.exports = { + + async receive(context) { + + const { accountId, audienceId, fileId, schema } = context.messages.in.content; + + const schemaConfig = {}; + + if (schema) { + schema.ADD.forEach(item => { + schemaConfig[item.csvHeader.toLowerCase().trim()] = item.fbType; + }); + } + + const fileStream = await context.getFileReadStream(fileId); + const fileInfo = await context.getFileInfo(fileId); + + const reader = fileStream.pipe(csv({ + mapHeaders: ({ header }) => header.toLowerCase().trim() + })); + + // randomInt limit is (max - min) < 2^48. See https://nodejs.org/api/crypto.html#cryptorandomintmin-max-callback. + const sessionId = randomInt(0, 2**48 - 1); + let batch = []; + let batchIndex = 0; + let estimatedMembersCount = 0; + let membersCount = 0; + + let numInvalidEntries = 0; + let invalidEntrySamples = []; + + // Process rows in a way so that we can detect the last row (and therefore last batch). + let previousRow = null; + + for await (const row of reader) { + membersCount += 1; + + if (previousRow !== null) { + batch.push(previousRow); + if (batch.length >= BATCH_SIZE) { + + if (batchIndex === 0) { + // First batch. Estimate the total number of members. + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); + } + + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + false, + estimatedMembersCount + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + batchIndex += 1; + batch = []; // Clear the batch + } + } + previousRow = row; + } + + if (previousRow !== null) { + batch.push(previousRow); + if (!estimatedMembersCount) { + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); + } + // Now process the last batch after loop ends. + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + true, + estimatedMembersCount + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + } + + return context.sendJson({ + account_id: accountId, + audience_id: audienceId, + num_invalid_entries: numInvalidEntries, + invalid_entry_samples: invalidEntrySamples, + num_total_entries: membersCount + }, 'out'); + } +}; + +function getRandomUniqueElements(array, numElements) { + + if (numElements >= array.length) { + return array; + } + + const selectedIndices = new Set(); + const result = []; + + while (result.length < numElements) { + const index = Math.floor(Math.random() * array.length); + if (!selectedIndices.has(index)) { + selectedIndices.add(index); + result.push(array[index]); + } + } + + return result; +} + +function estimateNumberOfRows(fileSize, batch) { + + // Estimate the number of rows in the file. + // This is a very rough estimate and may not be accurate. + // We don't want to loop over the entire batch rows. Instead, + // we will randomly select a limited number of rows and calculate + // the avarage row size in Bytes, then divide the total file size by this number. + + const randomRows = getRandomUniqueElements(batch, 20); + let randomRowsSize = 0; + for (const row of randomRows) { + randomRowsSize += Buffer.from(Object.values(row).join('')).length; + } + const averageRowSize = randomRowsSize / randomRows.length; + return Math.floor(fileSize / averageRowSize) + 1; +} + +async function sendBatchToFacebook(context, audienceId, schemaConfig, sessionId, batch, batchIndex, isLastBatch, estimatedMembersCount) { + + const body = { + payload: { + schema: detectSchema(batch, schemaConfig), + data: prepareMembers(batch, schemaConfig) + }, + session: { + session_id: sessionId, + batch_seq: batchIndex + 1, + last_batch_flag: isLastBatch, + estimated_num_total: estimatedMembersCount + }, + access_token: context.auth.accessToken + }; + + const url = `https://graph.facebook.com/v20.0/${audienceId}/users`; + + await context.log({ step: 'batch', schema: body.payload.schema, session: body.session, size: batch.length }); + + return context.httpRequest.delete(url, { data: body }); +} + +function detectSchema(batch, schemaConfig) { + + let schema = []; + for (const csvHeader in batch[0]) { + if (schemaConfig[csvHeader]) { + schema.push(schemaConfig[csvHeader]); + } + } + return schema; +} + +function prepareMembers(batch, schemaConfig) { + + return batch.map(member => { + const memberData = []; + for (const column in member) { + if (schemaConfig[column]) { + const value = createHash('sha256').update(member[column]).digest('hex'); + memberData.push(value); + } + } + return memberData; + }); +} diff --git a/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/component.json b/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/component.json new file mode 100644 index 000000000..3dd43819b --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/RemoveMembersInCSVFromAudience/component.json @@ -0,0 +1,131 @@ +{ + "name": "appmixer.facebookbusiness.marketing.RemoveMembersInCSVFromAudience", + "author": "Appmixer ", + "description": "Remove customers in a CSV file from a custom audience.", + "auth": { + "service": "appmixer:facebookbusiness", + "scope": [ + "ads_management", + "ads_read" + ] + }, + "quota": { + "manager": "appmixer:facebookbusiness", + "resources": "requests", + "scope": { + "userId": "{{userId}}" + } + }, + "inPorts": [ + { + "name": "in", + "schema": { + "type": "object", + "properties": { + "accountId": { "type": "string" }, + "audienceId": { "type": "string" }, + "fileId": { "type": "string" }, + "schema": { "type": "object" } + }, + "required": ["audienceId", "fileId"] + }, + "inspector": { + "inputs": { + "accountId": { + "type": "select", + "label": "Account ID", + "index": 1, + "tooltip": "Enter your Ad Account ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetAdAccounts?outPort=out", + "data": { + "transform": "./GetAdAccounts#toSelectArray" + } + } + }, + "audienceId": { + "type": "select", + "label": "Custom Audience ID", + "index": 2, + "tooltip": "Enter your Custom Audience ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetCustomAudiences?outPort=out", + "data": { + "transform": "./GetCustomAudiences#toSelectArray", + "messages": { + "in/accountId": "inputs/in/accountId" + } + } + } + }, + "fileId": { + "type": "filepicker", + "index": 2, + "label": "File ID", + "tooltip": "Select a file or use a File ID returned from a previous step." + }, + "schema": { + "type": "expression", + "levels": ["ADD"], + "index": 3, + "label": "Schema", + "tooltip": "Specify what type of information you will provide in the CSV file. The order of the columns is not important.", + "fields": { + "csvHeader": { + "type": "text", + "label": "CSV Header", + "tooltip": "The name of the column in the CSV file. The column name is case insensitive." + }, + "fbType": { + "type": "select", + "label": "Facebook Type", + "tooltip": "The type of information in the column.", + "options": [ + { "value": "EMAIL", "label": "EMAIL (email address)" }, + { "value": "PHONE", "label": "PHONE (phone number)" }, + { "value": "GEN", "label": "GEN (gender - m|f)" }, + { "value": "DOBY", "label": "DOBY (birth year - YYYY)" }, + { "value": "DOBM", "label": "DOBM (birth month - MM - 01 to 12)" }, + { "value": "DOBD", "label": "DOBD (birthday - DD - 01 to 31)" }, + { "value": "LN", "label": "LN (last name)" }, + { "value": "FN", "label": "FN (first name)" }, + { "value": "FI", "label": "FI (first name initial)" }, + { "value": "CT", "label": "CT (city)" }, + { "value": "ST", "label": "ST (state - US 2-letter, Others lowercase)" }, + { "value": "ZIP", "label": "ZIP (zip code)" }, + { "value": "COUNTRY", "label": "COUNTRY (country code - 2-letter)" }, + { "value": "MADID", "label": "MADID (mobile advertiser ID)" }, + { "value": "EXTERN_ID", "label": "EXTERN_ID (your own ID)" } + ] + } + } + } + } + } + } + ], + "outPorts": [ + { + "name": "out", + "options": [{ + "value": "audience_id", + "label": "Audience ID", + "schema": { "type": "string" } + }, { + "value": "account_id", + "label": "Account ID", + "schema": { "type": "string" } + }, { + "value": "invalid_entry_samples", + "label": "Invalid Entry Samples", + "schema": { "type": "array" } + }, { + "value": "num_invalid_entries", + "label": "Number of Invalid Entries", + "schema": { "type": "integer" } + }] + } + ], + "icon": "" +} + diff --git a/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/ReplaceMembersFromCSVInAudience.js b/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/ReplaceMembersFromCSVInAudience.js new file mode 100644 index 000000000..25280c642 --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/ReplaceMembersFromCSVInAudience.js @@ -0,0 +1,235 @@ +const { randomInt, createHash } = require('crypto'); +const csv = require('csv-parser'); + +const BATCH_SIZE = 10000; +// See https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences#replace-api. +const REPLACE_SESSION_DURATION_WINDOW = 1000 * 60 * 88; // 88 minutes to stay on the safe side. + +module.exports = { + + async receive(context) { + + const { accountId, audienceId, fileId, schema } = context.messages.in.content; + + const schemaConfig = {}; + + if (schema) { + schema.ADD.forEach(item => { + schemaConfig[item.csvHeader.toLowerCase().trim()] = item.fbType; + }); + } + + const fileStream = await context.getFileReadStream(fileId); + const fileInfo = await context.getFileInfo(fileId); + + const reader = fileStream.pipe(csv({ + mapHeaders: ({ header }) => header.toLowerCase().trim() + })); + + // randomInt limit is (max - min) < 2^48. See https://nodejs.org/api/crypto.html#cryptorandomintmin-max-callback. + let sessionId = randomInt(0, 2**48 - 1); + let batch = []; + let batchIndex = 0; + let estimatedMembersCount = 0; + let membersCount = 0; + let operation = 'usersreplace'; + let isLastBatch = false; + + let numInvalidEntries = 0; + let invalidEntrySamples = []; + + const replaceStartedAt = new Date(); + + // Process rows in a way so that we can detect the last row (and therefore last batch). + let previousRow = null; + + for await (const row of reader) { + membersCount += 1; + isLastBatch = false; + + const timeElapsed = new Date() - replaceStartedAt; + const isReplaceSessionExpiring = timeElapsed > REPLACE_SESSION_DURATION_WINDOW; + + if (operation === 'usersreplace' && isReplaceSessionExpiring) { + // The maximum duration window for 1 replace session is 90 minutes. (We use 88 minutes to stay on the safe side.) + // The API will reject any batches for a session received after 90 minutes from the time the session started. + // If you need to send batches for a duration longer than 90 minutes, + // wait until the replace operation for that session is done, + // then use the //users endpoint’s add operation for the rest of your uploads. + // See https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences#replace-api. + isLastBatch = true; // The very next batch will be the last batch in the replace session. + await context.log({ + step: 'replace-session-expiring', + message: 'Replace session expiring. Uploading last batch and switching to add operation.', + timeElapsedMs: timeElapsed + }); + } + + if (previousRow !== null) { + batch.push(previousRow); + if (batch.length >= BATCH_SIZE) { + + if (batchIndex === 0) { + // First batch. Estimate the total number of members. + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); + } + + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + isLastBatch, + estimatedMembersCount, + operation + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + batchIndex += 1; + batch = []; // Clear the batch + + if (operation === 'usersreplace' && isReplaceSessionExpiring) { + // Reset session, switch to POST /users. + sessionId = randomInt(0, 2**48 - 1); + batchIndex = 0; + operation = 'users'; + } + } + } + previousRow = row; + } + + if (previousRow !== null) { + batch.push(previousRow); + if (!estimatedMembersCount) { + estimatedMembersCount = estimateNumberOfRows(fileInfo.length, batch); + } + isLastBatch = true; + // Now process the last batch after loop ends. + const response = await sendBatchToFacebook( + context, + audienceId, + schemaConfig, + sessionId, + batch, + batchIndex, + isLastBatch, + estimatedMembersCount, + operation + ); + await context.log({ step: 'batch-response', data: response.data, headers: response.headers }); + numInvalidEntries += response.data.num_invalid_entries; + invalidEntrySamples = invalidEntrySamples.concat(response.data.invalid_entry_samples || []); + } + + return context.sendJson({ + account_id: accountId, + audience_id: audienceId, + num_invalid_entries: numInvalidEntries, + invalid_entry_samples: invalidEntrySamples, + num_total_entries: membersCount + }, 'out'); + } +}; + +function getRandomUniqueElements(array, numElements) { + + if (numElements >= array.length) { + return array; + } + + const selectedIndices = new Set(); + const result = []; + + while (result.length < numElements) { + const index = Math.floor(Math.random() * array.length); + if (!selectedIndices.has(index)) { + selectedIndices.add(index); + result.push(array[index]); + } + } + + return result; +} + +function estimateNumberOfRows(fileSize, batch) { + + // Estimate the number of rows in the file. + // This is a very rough estimate and may not be accurate. + // We don't want to loop over the entire batch rows. Instead, + // we will randomly select a limited number of rows and calculate + // the avarage row size in Bytes, then divide the total file size by this number. + + const randomRows = getRandomUniqueElements(batch, 20); + let randomRowsSize = 0; + for (const row of randomRows) { + randomRowsSize += Buffer.from(Object.values(row).join('')).length; + } + const averageRowSize = randomRowsSize / randomRows.length; + return Math.floor(fileSize / averageRowSize) + 1; +} + +async function sendBatchToFacebook(context, audienceId, schemaConfig, sessionId, batch, batchIndex, isLastBatch, estimatedMembersCount, operation) { + + const body = { + payload: { + schema: detectSchema(batch, schemaConfig), + data: prepareMembers(batch, schemaConfig) + }, + session: { + session_id: sessionId, + batch_seq: batchIndex + 1, + last_batch_flag: isLastBatch, + estimated_num_total: estimatedMembersCount + }, + access_token: context.auth.accessToken + }; + + const url = `https://graph.facebook.com/v20.0/${audienceId}/${operation}`; + + await context.log({ step: 'batch', schema: body.payload.schema, session: body.session, size: batch.length, operation }); + + let response;; + + try { + response = await context.httpRequest.post(url, body); + } catch (error) { + await context.log({ + step: 'batch-error', + error: error.message, + data: error.response.data, + headers: error.response.headers, + status: error.response.status + }); + throw error; + } + return response; +} + +function detectSchema(batch, schemaConfig) { + + let schema = []; + for (const csvHeader in batch[0]) { + if (schemaConfig[csvHeader]) { + schema.push(schemaConfig[csvHeader]); + } + } + return schema; +} + +function prepareMembers(batch, schemaConfig) { + + return batch.map(member => { + const memberData = []; + for (const column in member) { + if (schemaConfig[column]) { + const value = createHash('sha256').update(member[column]).digest('hex'); + memberData.push(value); + } + } + return memberData; + }); +} diff --git a/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/component.json b/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/component.json new file mode 100644 index 000000000..d639e1a33 --- /dev/null +++ b/src/appmixer/facebookbusiness/marketing/ReplaceMembersFromCSVInAudience/component.json @@ -0,0 +1,135 @@ +{ + "name": "appmixer.facebookbusiness.marketing.ReplaceMembersFromCSVInAudience", + "author": "Appmixer ", + "description": "Replace customers from a CSV file in a custom audience.", + "auth": { + "service": "appmixer:facebookbusiness", + "scope": [ + "ads_management", + "ads_read" + ] + }, + "quota": { + "manager": "appmixer:facebookbusiness", + "resources": "requests", + "scope": { + "userId": "{{userId}}" + } + }, + "inPorts": [ + { + "name": "in", + "schema": { + "type": "object", + "properties": { + "accountId": { "type": "string" }, + "audienceId": { "type": "string" }, + "fileId": { "type": "string" }, + "schema": { "type": "object" } + }, + "required": ["audienceId", "fileId"] + }, + "inspector": { + "inputs": { + "accountId": { + "type": "select", + "label": "Account ID", + "index": 1, + "tooltip": "Enter your Ad Account ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetAdAccounts?outPort=out", + "data": { + "transform": "./GetAdAccounts#toSelectArray" + } + } + }, + "audienceId": { + "type": "select", + "label": "Custom Audience ID", + "index": 2, + "tooltip": "Enter your Custom Audience ID.", + "source": { + "url": "/component/appmixer/facebookbusiness/marketing/GetCustomAudiences?outPort=out", + "data": { + "transform": "./GetCustomAudiences#toSelectArray", + "messages": { + "in/accountId": "inputs/in/accountId" + } + } + } + }, + "fileId": { + "type": "filepicker", + "index": 2, + "label": "File ID", + "tooltip": "Select a file or use a File ID returned from a previous step." + }, + "schema": { + "type": "expression", + "levels": ["ADD"], + "index": 3, + "label": "Schema", + "tooltip": "Specify what type of information you will provide in the CSV file. The order of the columns is not important.", + "fields": { + "csvHeader": { + "type": "text", + "label": "CSV Header", + "tooltip": "The name of the column in the CSV file. The column name is case insensitive." + }, + "fbType": { + "type": "select", + "label": "Facebook Type", + "tooltip": "The type of information in the column.", + "options": [ + { "value": "EMAIL", "label": "EMAIL (email address)" }, + { "value": "PHONE", "label": "PHONE (phone number)" }, + { "value": "GEN", "label": "GEN (gender - m|f)" }, + { "value": "DOBY", "label": "DOBY (birth year - YYYY)" }, + { "value": "DOBM", "label": "DOBM (birth month - MM - 01 to 12)" }, + { "value": "DOBD", "label": "DOBD (birthday - DD - 01 to 31)" }, + { "value": "LN", "label": "LN (last name)" }, + { "value": "FN", "label": "FN (first name)" }, + { "value": "FI", "label": "FI (first name initial)" }, + { "value": "CT", "label": "CT (city)" }, + { "value": "ST", "label": "ST (state - US 2-letter, Others lowercase)" }, + { "value": "ZIP", "label": "ZIP (zip code)" }, + { "value": "COUNTRY", "label": "COUNTRY (country code - 2-letter)" }, + { "value": "MADID", "label": "MADID (mobile advertiser ID)" }, + { "value": "EXTERN_ID", "label": "EXTERN_ID (your own ID)" } + ] + } + } + } + } + } + } + ], + "outPorts": [ + { + "name": "out", + "options": [{ + "value": "audience_id", + "label": "Audience ID", + "schema": { "type": "string" } + }, { + "value": "account_id", + "label": "Account ID", + "schema": { "type": "string" } + }, { + "value": "invalid_entry_samples", + "label": "Invalid Entry Samples", + "schema": { "type": "array" } + }, { + "value": "num_invalid_entries", + "label": "Number of Invalid Entries", + "schema": { "type": "integer" } + }, { + "value": "num_total_entries", + "label": "Number of Entries", + "schema": { "type": "integer" } + }] + } + ], + "icon": "" +} +