Skip to content

CRUD Salesforce

connectx.crud.salesforce

Salesforce

Bases: Rest

Salesforce connector using REST API.

Provides methods to read, create, update, delete, and integrate records in Salesforce using the REST API. Supports ETL logging, batching, and concurrent operations.

__init__

Initialize the Salesforce connector and authenticate with the API.

Parameters:

Name Type Description Default
url str

Base URL of the Salesforce instance.

required
client_id str

Salesforce client ID for authentication.

required
client_secret str

Salesforce client secret for authentication.

required
grant_type str

OAuth grant type. Default is "client_credentials".

'client_credentials'
path str

Base path for Salesforce REST API requests.

'/services/data/v62.0'
n_rows_write int

Maximum number of rows per batch write operation.

200
n_concurrent int

Maximum number of concurrent requests.

4
die_on_error bool

If True, raise exceptions on errors. Otherwise, log and continue.

True
timeout

Request timeout in seconds.

120
verify

SSL verification (True/False or path to CA bundle).

None
cert

Path to client certificate if required.

None

connect

Authenticate with Salesforce using OAuth2 client credentials.

If a valid token is cached, it will be reused. Otherwise, a new token will be requested from the Salesforce token endpoint.

Returns:

Type Description

self

Raises:

Type Description
Exception

if authentication fails

create

Create a new record in a Salesforce object.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
data dict

Dictionary of field values to create the record

required

Returns:

Type Description
dict | None

The created record as a dictionary, or None if creation failed

delete

Delete a record from a Salesforce object by ID.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
id str

ID of the record to delete

required

Returns:

Type Description
dict | None

Salesforce response as a dictionary, or None if deletion failed

headers

Generate headers for authenticated Salesforce API requests.

Returns:

Type Description
dict

Dictionary containing the Authorization header with Bearer token

integrate

Update a Salesforce record using a sync key. If record exists, updates it.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
data dict

Dictionary of field values to update

required
sync_key str

Field used as a unique identifier for integration

required

Returns:

Type Description
dict | None

Updated record as a dictionary, or None if operation failed

job_end

Complete the current ETL job and update its status in Salesforce.

Checks all ETL logs associated with the current job. If all logs are marked as completed, updates the ETL_Job__c record's Last_Completed__c field with the start time of the last processed log. Resets internal job and log references after completion.

Returns:

Type Description

None

job_save

Create or update an ETL job record in Salesforce.

Updates the last completed date, enables the job, and returns key metadata about the job.

Parameters:

Name Type Description Default
id str

Salesforce ID of the ETL job (if updating an existing record)

None
name str

Name of the ETL job (required if creating a new job)

None
last_start_date str

Optional timestamp for the last completed run

None

Returns:

Type Description

Dictionary containing ETL job details including 'last_start_date', 'goback_hours', and 'enabled'

job_start

Start an ETL job in Salesforce and initialize logs for the specified steps.

This method integrates an ETL_Job__c record using the provided name as a unique sync key. If the job is enabled, it sets up ETL logs for each step specified in the steps list. If the job is disabled, an exception is raised.

Parameters:

Name Type Description Default
name str

Name of the ETL job

required
steps list

Optional list of step names for which ETL logs will be created

None
data dict

Optional dictionary of additional fields to include in the ETL job

None

Returns:

Type Description

Dictionary representing the ETL job record

Raises:

Type Description
JobNotEnabled

If the ETL job is disabled in Salesforce

load

Perform bulk or standard operations on Salesforce records.

Supports CREATE, UPDATE, DELETE, INTEGRATE, and RETRIEVE actions.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
data DataFrame

Polars DataFrame containing records to process

required
action int

Action to perform (CREATE, UPDATE, DELETE, INTEGRATE, RETRIEVE)

required
sync_key str

External key for INTEGRATE action

None
transform callable

Optional callable to transform each row before processing

None
picklist dict

Optional dictionary mapping picklist values

None
n_rows_write int

Maximum number of rows to send per request

200
n_concurrent int

Maximum number of concurrent requests

None
die_on_error bool

Raise exception if any record fails

False
callback callable

Optional callable for progress reporting

None
bulk bool

Whether to use Salesforce Bulk API for the operation

False

Returns:

Type Description
DataFrame

Polars DataFrame with original data plus 'success' and 'message' columns

log_create

Create a new ETL log entry in Salesforce for the current job.

Parameters:

Name Type Description Default
name str

Name of the log entry

required
data dict

Optional dictionary of additional field values

None

Returns:

Type Description

Created ETL_Log__c record as a dictionary

log_save

Create or update an ETL log record in Salesforce.

Handles logging of job progress, status, number of processed records, and optional files or row-level errors.

Parameters:

Name Type Description Default
id str

Salesforce ID of the ETL log (if updating an existing record)

None
name str

Name of the log

None
job_id str

Salesforce ID of the parent ETL job

None
log_id str

Salesforce ID of the parent log (for nested logs)

None
status str

Status of the log (maps to Completed__c and Status__c)

None
num_records

Total number of records intended for processing

None
processed_records int

Number of records processed (optional, not always stored)

None
success_records

Number of successfully processed records

None
fail_records int

Number of failed records

None
description str

Optional text description of the log

None
start_date str

Optional start timestamp of the log

None
end_date str

Optional end timestamp of the log

None
files list

List of files to upload associated with the log

None
rows list

List of Polars DataFrames containing row-level errors

None

Returns:

Type Description

Dictionary representing the created or updated ETL log record

log_start

Start or resume an ETL log for a specific step of the current job.

If the ETL job is not already started, it initializes the job with this step. Updates the Elaboration_Start__c timestamp for the log entry.

Parameters:

Name Type Description Default
name str

Name of the log step

required
data dict

Optional dictionary of additional field values

None

Returns:

Type Description

Current ETL job record as a dictionary

log_update

Update an existing ETL log entry with field values, status, file attachments, or row-level errors.

Handles updating the completion status, uploading files, creating child log entries for row errors, and can trigger job closure if the log corresponds to the main ETL job.

Parameters:

Name Type Description Default
data dict

Dictionary of field values to update in the ETL log

None
success bool

Boolean indicating whether the log step completed successfully

None
files list

List of file contents to attach to the log

None
rows list

List of Polars DataFrames containing row-level errors to record

None

Returns:

Type Description

Updated ETL_Log__c record as a dictionary

ping

Check if the Salesforce API is reachable.

Returns:

Type Description
bool

True if API responds without error, False otherwise

pk

Return the primary key field name for Salesforce objects.

Parameters:

Name Type Description Default
module str

Optional Salesforce object name

None

Returns:

Type Description
str

Primary key field name (always "Id")

query

Execute a SOQL query and return the results as a Polars DataFrame.

Supports standard and bulk queries, handles pagination, and flattens nested records.

Parameters:

Name Type Description Default
query str

SOQL query string to execute

required
bulk bool

If True, use Salesforce Bulk API

False
callback callable

Optional callback function for progress reporting

None

Returns:

Type Description
DataFrame

Polars DataFrame with query results

read

Read data from a Salesforce object (sObject) into a Polars DataFrame.

Builds a SOQL query automatically if fields are not provided. Supports filtering, ordering, limiting, and bulk mode.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject) to read from

required
filter str

Optional SOQL WHERE clause

None
fields list

List of fields to retrieve; all fields if None

None
order_by str

Optional SOQL ORDER BY clause

None
limit int

Optional limit for number of records

None
bulk bool

If True, uses Salesforce Bulk API

False
callback callable

Optional callback function for progress reporting

None

Returns:

Type Description
DataFrame

Polars DataFrame containing retrieved records

request

Make an HTTP request to the Salesforce API.

Handles JSON responses and status codes. Returns parsed JSON, True for 204 status, or the raw response object if parsing fails.

Parameters:

Name Type Description Default
method str

HTTP method (GET, POST, DELETE, etc.)

required
path str

Salesforce API path

required
headers dict

Optional headers dictionary

None
kwargs

Additional arguments passed to requests

{}

Returns:

Type Description
dict | bool | Response

Parsed JSON response, True for 204 No Content, or raw Response

retrieve

Retrieve a single record from a Salesforce object by ID.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
id str

ID of the record to retrieve

required

Returns:

Type Description
dict | None

Record data as a dictionary, or None if not found

update

Update an existing record in a Salesforce object.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
data dict

Dictionary of field values to update

required
id str

ID of the record to update; if None, will use ID from data

None

Returns:

Type Description
dict | None

Updated record as a dictionary, or None if update failed

upload

Upload files or CSVs to a Salesforce record as ContentVersion.

Parameters:

Name Type Description Default
module str

Salesforce object name (sObject)

required
data dict

Dictionary where key is field name and value is file path or DataFrame

required
id str

ID of the target record to attach files

required

Returns:

Type Description
dict | None

Response from Salesforce (currently None, can be extended to return uploaded IDs)