Solution 1
Solution
Example Job result of a possible solution:
import connectx
import polars as pl
from connectx import Etl, Task
from connectx.etllog import EtlLog, EtlJob
from connectx.sftp import SFTP
from connectx.crud.sugarcrm import SugarCRM
from datetime import datetime
name = "export_prospects"
def run(etl: Etl, etllog: EtlLog, etljob: EtlJob):
# ------------------------------------------------------------------------------------------------------------------------------------------
# SugarCRM
sugarcrm = SugarCRM(**etl.get("sugar"))
# ------------------------------------------------------------------------------------------------------------------------------------------
# EXTRACT
responses = etl.run_tasks([
Task(sugarcrm.read,
module='Accounts',
fields=[
"id",
"name",
"fatturato_c",
"phone_office",
"phone_alternate",
"billing_address_street",
"billing_address_city",
"billing_address_state",
"billing_address_postalcode",
"billing_address_country",
"shipping_address_street",
"shipping_address_city",
"shipping_address_state",
"shipping_address_postalcode",
"shipping_address_country",
"email1",
"account_type",
"description"
],
filter=[{
"account_type": "Prospect",
"date_modified": {"$gte": etljob.get("last_start_date")}
}
],
callback=etllog
)
])
read_sugar_prospects = responses[0]
# print(read_sugar_prospects)
# ------------------------------------------------------------------------------------------------------------------------------------------
# TRANSFORM
# https://docs.pola.rs/user-guide/expressions/
read_sugar_prospects = read_sugar_prospects.rename({"fatturato_c": "annual_revenue"})
load_sugar_prospects = read_sugar_prospects
# print(load_sugar_prospects)
# ------------------------------------------------------------------------------------------------------------------------------------------
# LOAD
catchers = []
# Genero sempre il file, anche se non ho dati altrimenti sembra che l'etl non stia girando correttamente
# if not load_sugar_prospects.is_empty():
# Ottieni data e ora attuali
timestamp = datetime.now()
# Formatta come yyyy_mm_dd_hh_mi
formatted_time = timestamp.strftime("%Y_%m_%d_%H_%M")
# Costruisce il nome del file
file_name = f"{formatted_time}_{etl.get("write_csv.prospects.file_suffix")}"
response_write_csv = connectx.write_csv(df=load_sugar_prospects,
file=etl.get("write_csv.prospects.dir") + file_name,
separator=";",
quote_style="never"
encoding="utf8"
)
catcher = connectx.catcher(response_write_csv)
catchers += catcher
if not catchers:
# Upload
sftp_client = SFTP(**etl.get("sftp.prospects"))
sftp_client.upload(local_path=etl.get("sftp.prospects.local_path"))
connectx.move_and_delete_files_backup(
file_path=etl.get("sftp.prospects.local_path")
)
return catchers