Skip to content

Solution 1

Solution

Example Job result of a possible solution:

import connectx
import polars as pl
from connectx import Etl, Task
from connectx.etllog import EtlLog, EtlJob
from connectx.sftp import SFTP
from connectx.crud.sugarcrm import SugarCRM
from datetime import datetime

name = "export_prospects"


def run(etl: Etl, etllog: EtlLog, etljob: EtlJob):
    # ------------------------------------------------------------------------------------------------------------------------------------------
    # SugarCRM
    sugarcrm = SugarCRM(**etl.get("sugar"))

    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   EXTRACT

    responses = etl.run_tasks([
        Task(sugarcrm.read,
             module='Accounts',
             fields=[
                 "id",
                 "name",
                 "fatturato_c",
                 "phone_office",
                 "phone_alternate",
                 "billing_address_street",
                 "billing_address_city",
                 "billing_address_state",
                 "billing_address_postalcode",
                 "billing_address_country",
                 "shipping_address_street",
                 "shipping_address_city",
                 "shipping_address_state",
                 "shipping_address_postalcode",
                 "shipping_address_country",
                 "email1",
                 "account_type",
                 "description"
             ],
             filter=[{
                 "account_type": "Prospect",
                 "date_modified": {"$gte": etljob.get("last_start_date")}
             }
             ],
             callback=etllog
             )
    ])
    read_sugar_prospects = responses[0]

    # print(read_sugar_prospects)
    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   TRANSFORM
    # https://docs.pola.rs/user-guide/expressions/

    read_sugar_prospects = read_sugar_prospects.rename({"fatturato_c": "annual_revenue"})

    load_sugar_prospects = read_sugar_prospects

    # print(load_sugar_prospects)
    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   LOAD

    catchers = []

    # Genero sempre il file, anche se non ho dati altrimenti sembra che l'etl non stia girando correttamente
    # if not load_sugar_prospects.is_empty():

    # Ottieni data e ora attuali
    timestamp = datetime.now()

    # Formatta come yyyy_mm_dd_hh_mi
    formatted_time = timestamp.strftime("%Y_%m_%d_%H_%M")

    # Costruisce il nome del file
    file_name = f"{formatted_time}_{etl.get("write_csv.prospects.file_suffix")}"

    response_write_csv = connectx.write_csv(df=load_sugar_prospects,
                                            file=etl.get("write_csv.prospects.dir") + file_name,
                                            separator=";",
                                            quote_style="never"
                                            encoding="utf8"
                                            )

    catcher = connectx.catcher(response_write_csv)
    catchers += catcher

    if not catchers:
        # Upload
        sftp_client = SFTP(**etl.get("sftp.prospects"))
        sftp_client.upload(local_path=etl.get("sftp.prospects.local_path"))

        connectx.move_and_delete_files_backup(
            file_path=etl.get("sftp.prospects.local_path")
        )

    return catchers