Skip to content

connectx Utilities

connectx

Etl

__init__

Initializes the Etl instance with the given environment settings.

Parameters:

Name Type Description Default
env dict | str

A dictionary containing environment settings. These settings are used to configure the instance. Default is "dev", to change environment pass the string "test" or "prod" on the command line when running main.py

'config'

end_job_error

function called when the run method raises an Exception. Closes the log if a sugar connection was created inside the run function. You can override this function inside the job file.

Parameters:

Name Type Description Default
ex Exception

Exception raised by the run method

required

get

Retrieves the value associated with the given key from the config dictionary.

Parameters:

Name Type Description Default
key

The key whose value needs to be retrieved.

required
default

The value to return if the key is not found. Default is None.

None

Returns:

Type Description

The value associated with the given key if it exists, otherwise the default value.

logging_init

Initialize logging

run

Executes a series of dependent jobs and optionally sends notification emails on errors or fatal exceptions.

Parameters:

Name Type Description Default
jobs tuple

Jobs to execute. Each job runs only if the previous one succeeded.

()
send_mail_on_error bool

Flag indicating whether to send an email if any job fails. Sends email with exceptions or error files.

required
send_mail_on_fatal bool

Flag indicating whether to send an email in case of a fatal error during job execution.

True
upload_limit int | None

Maximum number of error rows or files to upload. - None: no upload - int: limit of errors to upload

1000
parallel bool

Whether to run jobs in parallel.

False
kwargs dict

Additional parameters passed to jobs.

{}

Returns:

Type Description
list

List of error files generated during job execution.

run_task

Runs a single Task instance, initializing logging first.

Parameters:

Name Type Description Default
task Task

Task object to be executed.

required

Returns:

Type Description
Any

Result of executing the task.

run_tasks

Executes a list of Task objects, optionally in parallel using threads or processes.

Parameters:

Name Type Description Default
tasks list[Task]

List of Task objects to execute.

required
parallel bool

Flag to enable parallel execution.

True
use_thread bool

If True, uses ThreadPoolExecutor; otherwise ProcessPoolExecutor.

True
timeout float | None

Maximum time to wait for each task in seconds. Defaults to None (no timeout).

None
max_workers int | None

Maximum number of worker threads/processes. Defaults to number of tasks if None.

None

Returns:

Type Description
list

List of results from executing each task. Each result can be any type, often a dict with error info.

sendmail

Sends an email with the specified details and attachments.

Parameters:

Name Type Description Default
to_addr list[str]

A list of recipient email addresses.

required
subject str

The subject of the email.

required
body str

The body content of the email.

required
from_addr str

The email address of the sender.

None
cc_addr list[str]

A list of CC (carbon copy) recipient email addresses. Default is None.

None
bcc_addr list[str]

A list of BCC (blind carbon copy) recipient email addresses. Default is None.

None
attachments list

A list of file paths to be attached to the email. Default is None.

None

Returns:

Type Description
bool

Returns True if the email was sent successfully, otherwise False.

cast_df_columns_utf8

Force all column's type to utf8

Parameters:

Name Type Description Default
df DataFrame

pl.DataFrame

required

Returns:

Type Description
DataFrame

pl.DataFrame

catcher

Identifies rows with errors in the provided Polars DataFrames and returns a list of DataFrames containing only the erroneous rows.

Parameters:

Name Type Description Default
dataframe DataFrame

pl.DataFrame A variable number of Polars DataFrame objects to be checked for errors. These dataframes should be the response of a load() or link() function, since they must have a bool column called "success".

()

Returns:

Type Description
list

A list containing as many DataFrames as were passed in. Each DataFrame in the list contains only the rows from the corresponding input DataFrame that have errors.

check_requirements

Checks that required packages are installed and satisfy version constraints defined in a requirements file.

If the requirements file does not exist, it creates one with a default entry for 'connectx'. Raises an exception if a required package is missing or its version does not meet the specified constraint.

Parameters:

Name Type Description Default
req_file str

Path to the requirements file. Defaults to "requirements.txt".

'requirements.txt'
req_packages list

List of package names to explicitly check. Defaults to ['connectx'].

None

Raises:

Type Description
Exception

If a required package is not installed or does not satisfy version constraints.

concat

Concatenate dataframes that may differ in columns

Parameters:

Name Type Description Default
dataframes list[DataFrame]

A list of DataFrame to be concat

required
kwargs

Additional keyword arguments to pass to the concat method: how: ConcatMethod = 'vertical', rechunk: bool = False, parallel: bool = True, https://docs.pola.rs/api/python/stable/reference/api/polars.concat.html

{}

Returns:

Type Description
DataFrame

date_range

Calcola il range di date per la settimana corrente.

Returns: tuple: Una tupla contenente le date di inizio e fine della settimana corrente nel formato YYYY-MM-DD (lunedì e domenica).

delete_files_backup

Deletes files from the backup directory that are older than a specified number of days.

Parameters:

Name Type Description Default
backup_dir str

The directory from which old backup files will be deleted. Default is "data/backup/".

'data/backup'
older_than int

The number of days after which files in the backup directory should be deleted. Default is 30 days.

30

download_from_sftp

Downloads files from an SFTP server to a local path.

Parameters:

Name Type Description Default
remote_path str

The path to the file on the SFTP server that needs to be downloaded.

required
local_path str

The local directory path where the file will be saved. Default is "data/import".

'data/import'
host str

The hostname or IP address of the SFTP server. Default is None.

None
port int

The port number of the SFTP server. Default is None.

None
username str

The username for authentication on the SFTP server. Default is None.

None
password str

The password for authentication on the SFTP server. Default is None.

None
remove_from_server bool

Default is True.

True

Returns:

Type Description
list

A list of downloaded files. It downloads the specified file from the SFTP server to the local path.

fill_empty

Force value to every empty value

Parameters:

Name Type Description Default
df DataFrame

pl.DataFrame

required
value str

the empty value

required

Returns:

Type Description

pl.DataFrame

get_installed_packages

Returns a dictionary of installed Python packages with their versions.

Returns:

Type Description
dict[str, str]

Dictionary mapping package names (lowercased) to their version strings.

move_and_delete_files_backup

Moves files matching a specified pattern to a backup directory appending a timestamp suffix to each file name and deletes files older than a specified number of days from the backup directory.

Parameters:

Name Type Description Default
file_path str | list[str]

The path pattern of files to be moved to the backup directory. Default is "data/import/;data/export/".

'data/import/*;data/export/*'
suffix_pattern str

The datetime pattern to append as a suffix to the file names in the backup directory. Default is "%Y%m%d%H%M".

'_%Y%m%d_%H%M'
older_than int

The number of days after which files in the backup directory should be deleted. Default is 30.

30
backup_dir str

The directory where files will be moved for backup. Default is "backup/".

'data/backup/'
zip bool

Zip on backup. Default is True.

True

move_to_backup

Moves files matching a specified pattern to a backup directory, appending a timestamp suffix to each file name.

Parameters:

Name Type Description Default
file_path str | list[str]

The path pattern of files to be moved to the backup directory. Default is "data/import/;data/export/".

'data/import/*;data/export/*'
suffix_pattern str

The datetime pattern to append as a suffix to the file names in the backup directory. Default is "%Y%m%d%H%M".

'_%Y%m%d_%H%M'
backup_dir str

The directory where files will be moved for backup. Default is "backup/".

'data/backup'
zip bool

Zip on backup. Default is True.

True

picklist

Reads a Polars DataFrame and creates a lookup dictionary.

Parameters:

Name Type Description Default
dataframe DataFrame

The Polars DataFrame from which will be created the lookup dictionary.

required
columns list

A list of column names of the DataFrame. There should always be a column for field name, from value and to value. If not provided, the Dataframe columns are used.

None

Returns:

Type Description
dict

A dictionary where you can access the translation of a specific value, based on the dataframe in input. Example: picklist.get("field_name").get("from_value")

read_csv

Reads a CSV file and returns it as a Polars DataFrame with the given options.

Parameters:

Name Type Description Default
source str

source filename

required
required bool

generate exception if no files was found and required is True

False
separator str | None

csv separator

','
reverse_sort bool

sort in reverse mode if multiple files

False
start int

start from if multiple files

0
stop int

stop to if multiple files

None
kwargs

dict, optional Additional keyword arguments to pass to the CSV reader. These can include parameters like 'source', 'separator', 'has_header', 'encoding', etc., which control various aspects of the CSV reading process. https://docs.pola.rs/api/python/stable/reference/api/polars.read_csv.html

{}

Returns:

Type Description

A Polars DataFrame containing the data read from the CSV file.

strip_chars

Trim whitespace from all string columns

Parameters:

Name Type Description Default
df DataFrame

pl.DataFrame

required

Returns:

Type Description
DataFrame

pl.DataFrame

transform

Transform a dataframe with callback and picklist

Parameters:

Name Type Description Default
data DataFrame

a dataframe

required
transform callable

a callback function to be apply

required
picklist dict

an optional dictionary

None

Returns:

Type Description

a transformed dataframe

update

Updates the given dictionary with values from another dictionary or an iterable of key-value pairs.

Parameters:

Name Type Description Default
dic dict

The dictionary to be updated.

required
upd

A dictionary or an iterable of key-value pairs (e.g., a list of tuples) with updates to apply to the original dictionary.

required

Returns:

Type Description
dict

The updated dictionary with the new values merged into the original dictionary.

valid_email

Validates an email address against a specified regex pattern.

Parameters:

Name Type Description Default
email str

The email address to be validate.

required
pattern str

The regex pattern used to validate the email address.

'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'

Returns:

Type Description

True if the email address matches the pattern, False otherwise.

write_csv

Writes a Polars DataFrame to a CSV file with the given options.

Parameters:

Name Type Description Default
df DataFrame

pl.DataFrame. The Polars DataFrame to be written to CSV.

required
file str | Path | IO[str] | IO[bytes] | None

File path or writable file-like object to which the result will be written. If set to None (default), the output is returned as a string instead

None
transform callable

(optional) A transformation function to apply to the data before loading. Default is None.

None
picklist dict

(optional) A dictionary used for mapping or lookup during the load process. Default is None.

None
create bool

(optional) A boolean value for create action. Default is False.

False
n_rows_write int

The number of rows to write in each batch during the transform operation. Default is 1000.

1000
empty_value str

Set this value for empy values.

None
kwargs

dict, optional Additional keyword arguments to pass to the CSV writer. These can include parameters like 'separator', 'quote_char', etc., which control various aspects of the CSV writing process. https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_csv.html

{}

Returns:

Type Description
str

The path to the CSV file that was written or text if file is None.