Skip to content

Designing the First Flow

Project Structure

  • /custom
  • /data
  • /backup
  • /export
  • /import
  • /errors
  • /jobs
  • /logs
  • /maps
  • config.py
  • main.py
  • test_connections.py

Edit config.py

  • Equivalent to Talend contexts: a dictionary env composed of sub-dictionaries passed during object initialization.
  • Divided into 3 main dictionaries: dev, test, prod.
  • Environment is selected when running the file, e.g.:
python main.py -e test
  • Default environment is dev.
  • Keys specified outside the 3 main dictionaries apply to all environments.
  • Only specify keys that change within the specific environment; static keys go outside.
  • If the same dictionary exists both inside and outside an environment, the external dictionary is updated with the environment-specific keys.

Passwords

  • Must be encrypted using:
python main.py -p my_password_abcde
  • During development, plain text passwords are still interpreted correctly.
  • For client installations, all passwords must be encrypted in every environment.

Run test_connections.py

  • Executable to test connectors, including email sending:
python test_connections.py <environment>
  • Useful for production releases to test connections without running the full flow.

Edit main.py

  • Typical structure:
    1. Create etl object and call etl.start().
    2. Optional file reading from SFTP.
    3. Run jobs with etl.run():
      • Dependent jobs can be passed together (if one fails, subsequent jobs stop).
      • Independent jobs can be run individually.
      • In case of errors, an email is sent with attached error files.
      • For multiple individual calls, collect error files and send a single email at the end if needed.
    4. Optional backup and deletion of old files.
    5. Call etl.end().
  • main.py contains all flows in the correct order to respect object dependencies.
  • Multiple main.py files are allowed for flows with different scheduling requirements.
    Examples:
    • KPI report: 20th of each month
    • Daily companies flow: 6 AM
    • Offers flow: every 5 minutes from 7 AM to 7 PM

⚠️ Attention: For flows with frequent schedules, verify data consistency.
Example: If offers run every 5 minutes, ensure daily company data is processed first, otherwise run an ad-hoc flow for missing records.

Create Jobs and Maps

  • Start jobs from a starter pack or template in jobs/.
  • Development occurs within the run function.
  • Job phases:
    1. Initialization: setup connectors.
    2. EXTRACT: read files, modules, tables, etc.
    3. TRANSFORM: use Polars DataFrames to prepare data (check duplicates, group by, join, etc.).
    4. LOAD: load DataFrame using field transformations specified in the map file.
    5. LINK: handle relationships between records if needed.
    6. Catcher: analyze response DataFrame (from LOAD and LINK) and create error files in errors/ if necessary.
  • Create the corresponding mapping file in maps/ and import the transform function into the job:
from map.accounts_map import transform