Skip to content

CRUD SQL

connectx.crud.sql

SQL

Bases: Crud

Connector class for SQL databases.

Performing CRUD operations on SQL databases using SQLAlchemy and Polars.

__init__

Performing CRUD operations on SQL databases using SQLAlchemy and Polars.

Parameters:

Name Type Description Default
uri str

Database URI.

required
username str

Username for database connection.

required
password str

Password for database connection.

required
schema_name str

Schema name to use (optional).

None
primary_key str

Default primary key for tables (default "id").

'id'
n_rows_write int

Number of rows per batch when writing data.

100
timeout int

Connection timeout in seconds.

3

connect

Establish a connection to the configured database.

Returns:

Type Description

Instance of SQL with an active connection.

Raises:

Type Description
Exception

If the connection fails.

count

Count the number of records in the specified query or table.

Parameters:

Name Type Description Default
module str

SQL query or table name.

required

Returns:

Type Description
int

Total number of records.

create

Create a new record in the specified table.

Parameters:

Name Type Description Default
module str

Module or table name.

required
data dict

Dictionary of data to insert.

required
primary_key str | list[str]

Primary key for the record (optional).

None

Returns:

Type Description

Dictionary with the newly created record.

Raises:

Type Description
Exception

If insertion fails.

delete

Delete a record from the table.

Parameters:

Name Type Description Default
module str

Module or table name.

required
id str | int | dict | list[str | int]

Identifier of the record to delete.

required
primary_key str | list[str]

Primary key of the record (optional).

None

Returns:

Type Description

True if deletion was successful.

integrate

Integrate data into the database: update if record exists, otherwise create a new record.

Parameters:

Name Type Description Default
module str

Module or table name.

required
data dict

Dictionary with data to integrate.

required
sync_key str | dict | list[str]

Key(s) used for synchronization.

required
primary_key str | list[str]

Primary key of the record (optional).

None

Returns:

Type Description

Dictionary with the integrated record data.

load

Load data into a table with a specified action (CREATE, UPDATE, DELETE, INTEGRATE).

Parameters:

Name Type Description Default
module str

Table name.

required
data DataFrame

Polars DataFrame with data to load.

required
action int

Type of action to perform (CREATE, UPDATE, DELETE, INTEGRATE).

required
sync_key str | list[str]

Key(s) for synchronization (optional).

None
primary_key str | list[str]

Primary key of the record (optional).

None
transform

Optional function to transform data before loading.

None
picklist dict

Optional dictionary of picklist values.

None
n_rows_write int

Number of rows per batch (optional).

None
die_on_error bool

If True, stops execution on error.

True
callback callable

Optional callback function to track progress.

None

Returns:

Type Description

Polars DataFrame with the results of the operation.

ping

Check if the database connection is active.

Returns:

Type Description
bool

True if connection exists, False otherwise.

query

Execute a SQL query and return the results as a Polars DataFrame.

Parameters:

Name Type Description Default
query str | Selectable

SQL query string or SQLAlchemy Selectable object.

required
callback callable

Optional callback function.

None

Returns:

Type Description

Polars DataFrame containing the query results.

read

Read data from a table with optional filters, ordering, and limits.

Parameters:

Name Type Description Default
module str

Table name.

required
filter str

Optional SQL WHERE filter.

None
fields list

List of columns to read (default all columns).

None
order_by str

Column to order by (optional).

None
limit int

Maximum number of rows to read (optional).

None
callback callable

Optional callback function to track progress.

None

Returns:

Type Description
DataFrame

Polars DataFrame with the retrieved data.

read_table

Alias for query() method to read a table or query from the database.

Parameters:

Name Type Description Default
query str | Selectable

SQL query string or SQLAlchemy Selectable object.

required
kwargs

Additional arguments passed to query.

{}

Returns:

Type Description

Polars DataFrame containing the query results.

retrieve

Retrieve a record from the specified table.

Parameters:

Name Type Description Default
module str

Module or table name.

required
id str | int | dict | list[str | int]

Record identifier (single, list, or dict).

required
primary_key str | list[str]

Primary key to use (optional).

None

Returns:

Type Description

Dictionary with the retrieved record data.

Raises:

Type Description
Exception

If record not found or other errors occur.

table_name

Get the full table name (schema.table).

Parameters:

Name Type Description Default
module str

Module or table name.

required

Returns:

Type Description
str

Full table name including schema if available.

update

Update an existing record in the table.

Parameters:

Name Type Description Default
module str

Module or table name.

required
data dict

Dictionary of data to update.

required
id str | int | dict | list[str | int]

Identifier of the record to update.

None
primary_key str | list[str]

Primary key of the record (optional).

None

Returns:

Type Description

Dictionary with the updated record data.