connectx Utilities
connectx
Etl
__init__
Initializes the Etl instance with the given environment settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
dict | str
|
A dictionary containing environment settings. These settings are used to configure the instance. Default is "dev", to change environment pass the string "test" or "prod" on the command line when running main.py |
'config'
|
end_job_error
function called when the run method raises an Exception. Closes the log if a sugar connection was created inside the run function. You can override this function inside the job file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ex
|
Exception
|
Exception raised by the run method |
required |
get
Retrieves the value associated with the given key from the config dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
The key whose value needs to be retrieved. |
required | |
default
|
The value to return if the key is not found. Default is None. |
None
|
Returns:
| Type | Description |
|---|---|
|
The value associated with the given key if it exists, otherwise the default value. |
logging_init
Initialize logging
run
Executes a series of dependent jobs and optionally sends notification emails on errors or fatal exceptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jobs
|
tuple
|
Jobs to execute. Each job runs only if the previous one succeeded. |
()
|
send_mail_on_error
|
bool
|
Flag indicating whether to send an email if any job fails. Sends email with exceptions or error files. |
required |
send_mail_on_fatal
|
bool
|
Flag indicating whether to send an email in case of a fatal error during job execution. |
True
|
upload_limit
|
int | None
|
Maximum number of error rows or files to upload. - None: no upload - int: limit of errors to upload |
1000
|
parallel
|
bool
|
Whether to run jobs in parallel. |
False
|
kwargs
|
dict
|
Additional parameters passed to jobs. |
{}
|
Returns:
| Type | Description |
|---|---|
list
|
List of error files generated during job execution. |
run_task
Runs a single Task instance, initializing logging first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
Task object to be executed. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Result of executing the task. |
run_tasks
Executes a list of Task objects, optionally in parallel using threads or processes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[Task]
|
List of Task objects to execute. |
required |
parallel
|
bool
|
Flag to enable parallel execution. |
True
|
use_thread
|
bool
|
If True, uses ThreadPoolExecutor; otherwise ProcessPoolExecutor. |
True
|
timeout
|
float | None
|
Maximum time to wait for each task in seconds. Defaults to None (no timeout). |
None
|
max_workers
|
int | None
|
Maximum number of worker threads/processes. Defaults to number of tasks if None. |
None
|
Returns:
| Type | Description |
|---|---|
list
|
List of results from executing each task. Each result can be any type, often a dict with error info. |
sendmail
Sends an email with the specified details and attachments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_addr
|
list[str]
|
A list of recipient email addresses. |
required |
subject
|
str
|
The subject of the email. |
required |
body
|
str
|
The body content of the email. |
required |
from_addr
|
str
|
The email address of the sender. |
None
|
cc_addr
|
list[str]
|
A list of CC (carbon copy) recipient email addresses. Default is None. |
None
|
bcc_addr
|
list[str]
|
A list of BCC (blind carbon copy) recipient email addresses. Default is None. |
None
|
attachments
|
list
|
A list of file paths to be attached to the email. Default is None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Returns True if the email was sent successfully, otherwise False. |
cast_df_columns_utf8
Force all column's type to utf8
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
pl.DataFrame |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame |
catcher
Identifies rows with errors in the provided Polars DataFrames and returns a list of DataFrames containing only the erroneous rows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframe
|
DataFrame
|
pl.DataFrame A variable number of Polars DataFrame objects to be checked for errors. These dataframes should be the response of a load() or link() function, since they must have a bool column called "success". |
()
|
Returns:
| Type | Description |
|---|---|
list
|
A list containing as many DataFrames as were passed in. Each DataFrame in the list contains only the rows from the corresponding input DataFrame that have errors. |
check_requirements
Checks that required packages are installed and satisfy version constraints defined in a requirements file.
If the requirements file does not exist, it creates one with a default entry for 'connectx'. Raises an exception if a required package is missing or its version does not meet the specified constraint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
req_file
|
str
|
Path to the requirements file. Defaults to "requirements.txt". |
'requirements.txt'
|
req_packages
|
list
|
List of package names to explicitly check. Defaults to ['connectx']. |
None
|
Raises:
| Type | Description |
|---|---|
Exception
|
If a required package is not installed or does not satisfy version constraints. |
concat
Concatenate dataframes that may differ in columns
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframes
|
list[DataFrame]
|
A list of DataFrame to be concat |
required |
kwargs
|
Additional keyword arguments to pass to the concat method: how: ConcatMethod = 'vertical', rechunk: bool = False, parallel: bool = True, https://docs.pola.rs/api/python/stable/reference/api/polars.concat.html |
{}
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
|
date_range
Calcola il range di date per la settimana corrente.
Returns: tuple: Una tupla contenente le date di inizio e fine della settimana corrente nel formato YYYY-MM-DD (lunedì e domenica).
delete_files_backup
Deletes files from the backup directory that are older than a specified number of days.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backup_dir
|
str
|
The directory from which old backup files will be deleted. Default is "data/backup/". |
'data/backup'
|
older_than
|
int
|
The number of days after which files in the backup directory should be deleted. Default is 30 days. |
30
|
download_from_sftp
Downloads files from an SFTP server to a local path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
remote_path
|
str
|
The path to the file on the SFTP server that needs to be downloaded. |
required |
local_path
|
str
|
The local directory path where the file will be saved. Default is "data/import". |
'data/import'
|
host
|
str
|
The hostname or IP address of the SFTP server. Default is None. |
None
|
port
|
int
|
The port number of the SFTP server. Default is None. |
None
|
username
|
str
|
The username for authentication on the SFTP server. Default is None. |
None
|
password
|
str
|
The password for authentication on the SFTP server. Default is None. |
None
|
remove_from_server
|
bool
|
Default is True. |
True
|
Returns:
| Type | Description |
|---|---|
list
|
A list of downloaded files. It downloads the specified file from the SFTP server to the local path. |
fill_empty
Force value to every empty value
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
pl.DataFrame |
required |
value
|
str
|
the empty value |
required |
Returns:
| Type | Description |
|---|---|
|
pl.DataFrame |
get_installed_packages
Returns a dictionary of installed Python packages with their versions.
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Dictionary mapping package names (lowercased) to their version strings. |
move_and_delete_files_backup
Moves files matching a specified pattern to a backup directory appending a timestamp suffix to each file name and deletes files older than a specified number of days from the backup directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | list[str]
|
The path pattern of files to be moved to the backup directory. Default is "data/import/;data/export/". |
'data/import/*;data/export/*'
|
suffix_pattern
|
str
|
The datetime pattern to append as a suffix to the file names in the backup directory. Default is "%Y%m%d%H%M". |
'_%Y%m%d_%H%M'
|
older_than
|
int
|
The number of days after which files in the backup directory should be deleted. Default is 30. |
30
|
backup_dir
|
str
|
The directory where files will be moved for backup. Default is "backup/". |
'data/backup/'
|
zip
|
bool
|
Zip on backup. Default is True. |
True
|
move_to_backup
Moves files matching a specified pattern to a backup directory, appending a timestamp suffix to each file name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | list[str]
|
The path pattern of files to be moved to the backup directory. Default is "data/import/;data/export/". |
'data/import/*;data/export/*'
|
suffix_pattern
|
str
|
The datetime pattern to append as a suffix to the file names in the backup directory. Default is "%Y%m%d%H%M". |
'_%Y%m%d_%H%M'
|
backup_dir
|
str
|
The directory where files will be moved for backup. Default is "backup/". |
'data/backup'
|
zip
|
bool
|
Zip on backup. Default is True. |
True
|
picklist
Reads a Polars DataFrame and creates a lookup dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframe
|
DataFrame
|
The Polars DataFrame from which will be created the lookup dictionary. |
required |
columns
|
list
|
A list of column names of the DataFrame. There should always be a column for field name, from value and to value. If not provided, the Dataframe columns are used. |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
A dictionary where you can access the translation of a specific value, based on the dataframe in input. Example: picklist.get("field_name").get("from_value") |
read_csv
Reads a CSV file and returns it as a Polars DataFrame with the given options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
source filename |
required |
required
|
bool
|
generate exception if no files was found and required is True |
False
|
separator
|
str | None
|
csv separator |
','
|
reverse_sort
|
bool
|
sort in reverse mode if multiple files |
False
|
start
|
int
|
start from if multiple files |
0
|
stop
|
int
|
stop to if multiple files |
None
|
kwargs
|
dict, optional Additional keyword arguments to pass to the CSV reader. These can include parameters like 'source', 'separator', 'has_header', 'encoding', etc., which control various aspects of the CSV reading process. https://docs.pola.rs/api/python/stable/reference/api/polars.read_csv.html |
{}
|
Returns:
| Type | Description |
|---|---|
|
A Polars DataFrame containing the data read from the CSV file. |
strip_chars
Trim whitespace from all string columns
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
pl.DataFrame |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame |
transform
Transform a dataframe with callback and picklist
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
DataFrame
|
a dataframe |
required |
transform
|
callable
|
a callback function to be apply |
required |
picklist
|
dict
|
an optional dictionary |
None
|
Returns:
| Type | Description |
|---|---|
|
a transformed dataframe |
update
Updates the given dictionary with values from another dictionary or an iterable of key-value pairs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dic
|
dict
|
The dictionary to be updated. |
required |
upd
|
A dictionary or an iterable of key-value pairs (e.g., a list of tuples) with updates to apply to the original dictionary. |
required |
Returns:
| Type | Description |
|---|---|
dict
|
The updated dictionary with the new values merged into the original dictionary. |
valid_email
Validates an email address against a specified regex pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
email
|
str
|
The email address to be validate. |
required |
pattern
|
str
|
The regex pattern used to validate the email address. |
'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
|
Returns:
| Type | Description |
|---|---|
|
True if the email address matches the pattern, False otherwise. |
write_csv
Writes a Polars DataFrame to a CSV file with the given options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
pl.DataFrame. The Polars DataFrame to be written to CSV. |
required |
file
|
str | Path | IO[str] | IO[bytes] | None
|
File path or writable file-like object to which the result will be written. If set to |
None
|
transform
|
callable
|
(optional) A transformation function to apply to the data before loading. Default is None. |
None
|
picklist
|
dict
|
(optional) A dictionary used for mapping or lookup during the load process. Default is None. |
None
|
create
|
bool
|
(optional) A boolean value for create action. Default is False. |
False
|
n_rows_write
|
int
|
The number of rows to write in each batch during the transform operation. Default is 1000. |
1000
|
empty_value
|
str
|
Set this value for empy values. |
None
|
kwargs
|
dict, optional Additional keyword arguments to pass to the CSV writer. These can include parameters like 'separator', 'quote_char', etc., which control various aspects of the CSV writing process. https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_csv.html |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
The path to the CSV file that was written or text if file is None. |