CRUD Salesforce
connectx.crud.salesforce
Salesforce
Bases: Rest
Salesforce connector using REST API.
Provides methods to read, create, update, delete, and integrate records in Salesforce using the REST API. Supports ETL logging, batching, and concurrent operations.
__init__
Initialize the Salesforce connector and authenticate with the API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Base URL of the Salesforce instance. |
required |
client_id
|
str
|
Salesforce client ID for authentication. |
required |
client_secret
|
str
|
Salesforce client secret for authentication. |
required |
grant_type
|
str
|
OAuth grant type. Default is "client_credentials". |
'client_credentials'
|
path
|
str
|
Base path for Salesforce REST API requests. |
'/services/data/v62.0'
|
n_rows_write
|
int
|
Maximum number of rows per batch write operation. |
200
|
n_concurrent
|
int
|
Maximum number of concurrent requests. |
4
|
die_on_error
|
bool
|
If True, raise exceptions on errors. Otherwise, log and continue. |
True
|
timeout
|
Request timeout in seconds. |
120
|
|
verify
|
SSL verification (True/False or path to CA bundle). |
None
|
|
cert
|
Path to client certificate if required. |
None
|
connect
Authenticate with Salesforce using OAuth2 client credentials.
If a valid token is cached, it will be reused. Otherwise, a new token will be requested from the Salesforce token endpoint.
Returns:
| Type | Description |
|---|---|
|
self |
Raises:
| Type | Description |
|---|---|
Exception
|
if authentication fails |
create
Create a new record in a Salesforce object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
data
|
dict
|
Dictionary of field values to create the record |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
The created record as a dictionary, or None if creation failed |
delete
Delete a record from a Salesforce object by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
id
|
str
|
ID of the record to delete |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Salesforce response as a dictionary, or None if deletion failed |
headers
Generate headers for authenticated Salesforce API requests.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary containing the Authorization header with Bearer token |
integrate
Update a Salesforce record using a sync key. If record exists, updates it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
data
|
dict
|
Dictionary of field values to update |
required |
sync_key
|
str
|
Field used as a unique identifier for integration |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Updated record as a dictionary, or None if operation failed |
job_end
Complete the current ETL job and update its status in Salesforce.
Checks all ETL logs associated with the current job. If all logs are
marked as completed, updates the ETL_Job__c record's Last_Completed__c
field with the start time of the last processed log. Resets internal
job and log references after completion.
Returns:
| Type | Description |
|---|---|
|
None |
job_save
Create or update an ETL job record in Salesforce.
Updates the last completed date, enables the job, and returns key metadata about the job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
Salesforce ID of the ETL job (if updating an existing record) |
None
|
name
|
str
|
Name of the ETL job (required if creating a new job) |
None
|
last_start_date
|
str
|
Optional timestamp for the last completed run |
None
|
Returns:
| Type | Description |
|---|---|
|
Dictionary containing ETL job details including 'last_start_date', 'goback_hours', and 'enabled' |
job_start
Start an ETL job in Salesforce and initialize logs for the specified steps.
This method integrates an ETL_Job__c record using the provided name as a
unique sync key. If the job is enabled, it sets up ETL logs for each step
specified in the steps list. If the job is disabled, an exception is raised.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the ETL job |
required |
steps
|
list
|
Optional list of step names for which ETL logs will be created |
None
|
data
|
dict
|
Optional dictionary of additional fields to include in the ETL job |
None
|
Returns:
| Type | Description |
|---|---|
|
Dictionary representing the ETL job record |
Raises:
| Type | Description |
|---|---|
JobNotEnabled
|
If the ETL job is disabled in Salesforce |
load
Perform bulk or standard operations on Salesforce records.
Supports CREATE, UPDATE, DELETE, INTEGRATE, and RETRIEVE actions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
data
|
DataFrame
|
Polars DataFrame containing records to process |
required |
action
|
int
|
Action to perform (CREATE, UPDATE, DELETE, INTEGRATE, RETRIEVE) |
required |
sync_key
|
str
|
External key for INTEGRATE action |
None
|
transform
|
callable
|
Optional callable to transform each row before processing |
None
|
picklist
|
dict
|
Optional dictionary mapping picklist values |
None
|
n_rows_write
|
int
|
Maximum number of rows to send per request |
200
|
n_concurrent
|
int
|
Maximum number of concurrent requests |
None
|
die_on_error
|
bool
|
Raise exception if any record fails |
False
|
callback
|
callable
|
Optional callable for progress reporting |
None
|
bulk
|
bool
|
Whether to use Salesforce Bulk API for the operation |
False
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Polars DataFrame with original data plus 'success' and 'message' columns |
log_create
Create a new ETL log entry in Salesforce for the current job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the log entry |
required |
data
|
dict
|
Optional dictionary of additional field values |
None
|
Returns:
| Type | Description |
|---|---|
|
Created ETL_Log__c record as a dictionary |
log_save
Create or update an ETL log record in Salesforce.
Handles logging of job progress, status, number of processed records, and optional files or row-level errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
Salesforce ID of the ETL log (if updating an existing record) |
None
|
name
|
str
|
Name of the log |
None
|
job_id
|
str
|
Salesforce ID of the parent ETL job |
None
|
log_id
|
str
|
Salesforce ID of the parent log (for nested logs) |
None
|
status
|
str
|
Status of the log (maps to Completed__c and Status__c) |
None
|
num_records
|
Total number of records intended for processing |
None
|
|
processed_records
|
int
|
Number of records processed (optional, not always stored) |
None
|
success_records
|
Number of successfully processed records |
None
|
|
fail_records
|
int
|
Number of failed records |
None
|
description
|
str
|
Optional text description of the log |
None
|
start_date
|
str
|
Optional start timestamp of the log |
None
|
end_date
|
str
|
Optional end timestamp of the log |
None
|
files
|
list
|
List of files to upload associated with the log |
None
|
rows
|
list
|
List of Polars DataFrames containing row-level errors |
None
|
Returns:
| Type | Description |
|---|---|
|
Dictionary representing the created or updated ETL log record |
log_start
Start or resume an ETL log for a specific step of the current job.
If the ETL job is not already started, it initializes the job with this step.
Updates the Elaboration_Start__c timestamp for the log entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the log step |
required |
data
|
dict
|
Optional dictionary of additional field values |
None
|
Returns:
| Type | Description |
|---|---|
|
Current ETL job record as a dictionary |
log_update
Update an existing ETL log entry with field values, status, file attachments, or row-level errors.
Handles updating the completion status, uploading files, creating child log entries for row errors, and can trigger job closure if the log corresponds to the main ETL job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict
|
Dictionary of field values to update in the ETL log |
None
|
success
|
bool
|
Boolean indicating whether the log step completed successfully |
None
|
files
|
list
|
List of file contents to attach to the log |
None
|
rows
|
list
|
List of Polars DataFrames containing row-level errors to record |
None
|
Returns:
| Type | Description |
|---|---|
|
Updated ETL_Log__c record as a dictionary |
ping
Check if the Salesforce API is reachable.
Returns:
| Type | Description |
|---|---|
bool
|
True if API responds without error, False otherwise |
pk
Return the primary key field name for Salesforce objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Optional Salesforce object name |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Primary key field name (always "Id") |
query
Execute a SOQL query and return the results as a Polars DataFrame.
Supports standard and bulk queries, handles pagination, and flattens nested records.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
SOQL query string to execute |
required |
bulk
|
bool
|
If True, use Salesforce Bulk API |
False
|
callback
|
callable
|
Optional callback function for progress reporting |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Polars DataFrame with query results |
read
Read data from a Salesforce object (sObject) into a Polars DataFrame.
Builds a SOQL query automatically if fields are not provided. Supports filtering, ordering, limiting, and bulk mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) to read from |
required |
filter
|
str
|
Optional SOQL WHERE clause |
None
|
fields
|
list
|
List of fields to retrieve; all fields if None |
None
|
order_by
|
str
|
Optional SOQL ORDER BY clause |
None
|
limit
|
int
|
Optional limit for number of records |
None
|
bulk
|
bool
|
If True, uses Salesforce Bulk API |
False
|
callback
|
callable
|
Optional callback function for progress reporting |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Polars DataFrame containing retrieved records |
request
Make an HTTP request to the Salesforce API.
Handles JSON responses and status codes. Returns parsed JSON, True for 204 status, or the raw response object if parsing fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
method
|
str
|
HTTP method (GET, POST, DELETE, etc.) |
required |
path
|
str
|
Salesforce API path |
required |
headers
|
dict
|
Optional headers dictionary |
None
|
kwargs
|
Additional arguments passed to requests |
{}
|
Returns:
| Type | Description |
|---|---|
dict | bool | Response
|
Parsed JSON response, True for 204 No Content, or raw Response |
retrieve
Retrieve a single record from a Salesforce object by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
id
|
str
|
ID of the record to retrieve |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Record data as a dictionary, or None if not found |
update
Update an existing record in a Salesforce object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
data
|
dict
|
Dictionary of field values to update |
required |
id
|
str
|
ID of the record to update; if None, will use ID from data |
None
|
Returns:
| Type | Description |
|---|---|
dict | None
|
Updated record as a dictionary, or None if update failed |
upload
Upload files or CSVs to a Salesforce record as ContentVersion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Salesforce object name (sObject) |
required |
data
|
dict
|
Dictionary where key is field name and value is file path or DataFrame |
required |
id
|
str
|
ID of the target record to attach files |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
Response from Salesforce (currently None, can be extended to return uploaded IDs) |