Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboard initial marketo builk upload implementation #5114

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion router/batchrouter/asyncdestinationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newRegularManager(
case "BINGADS_OFFLINE_CONVERSIONS":
return bingads_offline_conversions.NewManager(conf, logger, statsFactory, destination, backendConfig)
case "MARKETO_BULK_UPLOAD":
return marketobulkupload.NewManager(conf, logger, statsFactory, destination)
return marketobulkupload.NewManager(logger, statsFactory, destination)
case "ELOQUA":
return eloqua.NewManager(logger, statsFactory, destination)
case "YANDEX_METRICA_OFFLINE_EVENTS":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package marketobulkupload

import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type MarketoAPIService struct {
logger logger.Logger
statsFactory stats.Stats
httpClient *http.Client
munchkinId string
authService *MarketoAuthService
}

type APIError struct {
StatusCode int64
Category string
Message string
}

func (m *MarketoAPIService) checkForCSVLikeResponse(resp *http.Response) bool {
// check for csv like response by checking the headers
respHeaders := resp.Header
return respHeaders.Get("Content-Type") == "text/csv;charset=UTF-8"
}

func (m *MarketoAPIService) ImportLeads(csvFilePath, deduplicationField string) (string, *APIError) {
uploadTimeStat := m.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{
"module": "batch_router",
"destType": "MARKETO_BULK_UPLOAD",
})

uploadURL := fmt.Sprintf("https://%s.mktorest.com/bulk/v1/leads.json", m.munchkinId)
token, err := m.authService.GetAccessToken()
if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

// send request
startTime := time.Now()
resp, err := sendHTTPRequest(uploadURL, csvFilePath, token, deduplicationField)
uploadTimeStat.Since(startTime)

if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

var marketoResponse MarketoResponse

err = json.Unmarshal(responseBody, &marketoResponse)

if err != nil {
return "", &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
if category == "Success" {
return marketoResponse.Result[0].ImportID, nil
}

return "", &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}
}

func (m *MarketoAPIService) PollImportStatus(importId string) (*MarketoResponse, *APIError) {
// poll for the import status

apiURL := fmt.Sprintf("https://%s.mktorest.com/bulk/v1/leads/batch/%s.json", m.munchkinId, importId)
token, err := m.authService.GetAccessToken()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

// Make the API request
req, err := http.NewRequest("GET", apiURL, nil)

if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in creating request"}
}
req.Header.Add("Authorization", "Bearer "+token)

resp, err := m.httpClient.Do(req)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

var marketoResponse MarketoResponse

err = json.Unmarshal(body, &marketoResponse)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

m.logger.Debugf("[Async Destination Manager] Marketo Poll Response: %v", marketoResponse)

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
if category == "Success" {
return &marketoResponse, nil
}

return nil, &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}

}

func (m *MarketoAPIService) GetLeadStatus(url string) ([]map[string]string, *APIError) {

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in creating request"}
}

token, err := m.authService.GetAccessToken()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in fetching access token"}
}

req.Header.Add("Authorization", "Bearer "+token)

resp, err := m.httpClient.Do(req)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in sending request"}
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in reading response body"}
}

m.logger.Debugf("[Async Destination Manager] Marketo Get Lead Status Response: %v", string(body))

if !m.checkForCSVLikeResponse(resp) {
var marketoResponse MarketoResponse
err = json.Unmarshal(body, &marketoResponse)
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing response body"}
}

statusCode, category, errorMessage := parseMarketoResponse(marketoResponse)
// if the response is not a csv like response, then it should be a json response
return nil, &APIError{StatusCode: statusCode, Category: category, Message: errorMessage}
}

// if the response is a csv like response
// parse the csv response

reader := csv.NewReader(strings.NewReader(string(body)))

rows, err := reader.ReadAll()
if err != nil {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "Error in parsing csv response"}
}

if len(rows) == 0 {
return nil, &APIError{StatusCode: 500, Category: "Retryable", Message: "No data in csv response"}
}

// The first row is the header
header := rows[0]

records := make([]map[string]string, 0, len(rows)-1)

for _, row := range rows[1:] {
record := make(map[string]string)
for i, value := range row {
if i < len(header) {
record[header[i]] = value
}
}
records = append(records, record)
}

return records, nil

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package marketobulkupload

import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

type MarketoAccessToken struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int64 `json:"expires_in"`
FetchedAt int64
Scope string `json:"scope"`
}

type MarketoAuthService struct {
httpCLient *http.Client
munchkinId string
clientId string
clientSecret string
accessToken MarketoAccessToken
}

func (m *MarketoAuthService) fetchOrUpdateAccessToken() error {

accessTokenURL := fmt.Sprintf("https://%s.mktorest.com/identity/oauth/token?client_id=%s&client_secret=%s&grant_type=client_credentials", m.munchkinId, m.clientId, m.clientSecret)
req, err := http.NewRequest("POST", accessTokenURL, nil)
if err != nil {
return err
}

resp, err := m.httpCLient.Do(req)
if err != nil {
return err
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

var accessToken MarketoAccessToken
err = json.Unmarshal(body, &accessToken)
if err != nil {
return err
}

accessToken.FetchedAt = time.Now().Unix()
m.accessToken = accessToken
return nil

}

func (m *MarketoAuthService) GetAccessToken() (string, error) {
// If the access token is nil or about to expire in 1 min, get a new one
if m.accessToken.AccessToken == "" || m.accessToken.FetchedAt+m.accessToken.ExpiresIn-60 < time.Now().Unix() {
err := m.fetchOrUpdateAccessToken()
if err != nil {
return "", err
}
}
return m.accessToken.AccessToken, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package marketobulkupload

import "encoding/json"

func (m *MarketoConfig) UnmarshalJSON(data []byte) error {
var intermediate intermediateMarketoConfig
if err := json.Unmarshal(data, &intermediate); err != nil {
return err
}

m.ClientId = intermediate.ClientId
m.ClientSecret = intermediate.ClientSecret
m.MunchkinId = intermediate.MunchkinId
m.DeduplicationField = intermediate.DeduplicationField
m.FieldsMapping = make(map[string]string)

for _, mapping := range intermediate.ColumnFieldsMapping {
from, fromOk := mapping["from"]
to, toOk := mapping["to"]
if fromOk && toOk {
m.FieldsMapping[from] = to
}
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package marketobulkupload

import (
stdjson "encoding/json"
"fmt"
"net/http"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
)

func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT) (*MarketoBulkUploader, error) {
destConfig := MarketoConfig{}
jsonConfig, err := stdjson.Marshal(destination.Config)
if err != nil {
return nil, fmt.Errorf("error in marshalling destination config: %v", err)
}
err = stdjson.Unmarshal(jsonConfig, &destConfig)
if err != nil {
return nil, fmt.Errorf("error in unmarshalling destination config: %v", err)
}

destName := destination.DestinationDefinition.Name

return NewMarketoBulkUploader(destName, statsFactory, destConfig), nil
}

func NewMarketoBulkUploader(destinationName string, statsFactory stats.Stats, destConfig MarketoConfig) *MarketoBulkUploader {

authService := &MarketoAuthService{
munchkinId: destConfig.MunchkinId,
clientId: destConfig.ClientId,
clientSecret: destConfig.ClientSecret,
httpCLient: &http.Client{},
}

apiService := &MarketoAPIService{
logger: logger.NewLogger().Child("batchRouter").Child("AsyncDestinationManager").Child("Marketo").Child("Marketo Builk Upload").Child("API Service"),
statsFactory: statsFactory,
httpClient: &http.Client{},
munchkinId: destConfig.MunchkinId,
authService: authService,
}

return &MarketoBulkUploader{
destName: destinationName,
logger: logger.NewLogger().Child("batchRouter").Child("AsyncDestinationManager").Child("Marketo").Child("Marketo Builk Upload"),
destinationConfig: destConfig,
dataHashToJobId: make(map[string]int64),
statsFactory: statsFactory,
apiService: apiService,
}
}
Loading
Loading