Designing the First Flow
Project Structure
- /custom
- /data
- /backup
- /export
- /import
- /errors
- /jobs
- /logs
- /maps
- config.py
- main.py
- test_connections.py
Edit config.py
- Equivalent to Talend contexts: a dictionary
envcomposed of sub-dictionaries passed during object initialization. - Divided into 3 main dictionaries: dev, test, prod.
- Environment is selected when running the file, e.g.:
python main.py -e test
- Default environment is dev.
- Keys specified outside the 3 main dictionaries apply to all environments.
- Only specify keys that change within the specific environment; static keys go outside.
- If the same dictionary exists both inside and outside an environment, the external dictionary is updated with the environment-specific keys.
Passwords
- Must be encrypted using:
python main.py -p my_password_abcde
- During development, plain text passwords are still interpreted correctly.
- For client installations, all passwords must be encrypted in every environment.
Run test_connections.py
- Executable to test connectors, including email sending:
python test_connections.py <environment>
- Useful for production releases to test connections without running the full flow.
Edit main.py
- Typical structure:
- Create
etlobject and calletl.start(). - Optional file reading from SFTP.
- Run jobs with
etl.run():- Dependent jobs can be passed together (if one fails, subsequent jobs stop).
- Independent jobs can be run individually.
- In case of errors, an email is sent with attached error files.
- For multiple individual calls, collect error files and send a single email at the end if needed.
- Optional backup and deletion of old files.
- Call
etl.end().
- Create
main.pycontains all flows in the correct order to respect object dependencies.- Multiple
main.pyfiles are allowed for flows with different scheduling requirements.
Examples:- KPI report: 20th of each month
- Daily companies flow: 6 AM
- Offers flow: every 5 minutes from 7 AM to 7 PM
⚠️ Attention: For flows with frequent schedules, verify data consistency.
Example: If offers run every 5 minutes, ensure daily company data is processed first, otherwise run an ad-hoc flow for missing records.
Create Jobs and Maps
- Start jobs from a starter pack or template in
jobs/. - Development occurs within the
runfunction. - Job phases:
- Initialization: setup connectors.
- EXTRACT: read files, modules, tables, etc.
- TRANSFORM: use Polars DataFrames to prepare data (check duplicates, group by, join, etc.).
- LOAD: load DataFrame using field transformations specified in the map file.
- LINK: handle relationships between records if needed.
- Catcher: analyze response DataFrame (from LOAD and LINK) and create error files in
errors/if necessary.
- Create the corresponding mapping file in
maps/and import the transform function into the job:
from map.accounts_map import transform