Solution 2
Solution
Example Job result of a possible solution:
import connectx
from connectx import Etl
import polars as pl
def run(etl: Etl):
# ------------------------------------------------------------------------------------------------------------------------------------------
# EXTRACT
# Read contacts file
read_contacts = connectx.read_csv(**etl.get("read_csv.contacts"))
# Read HubSpot contacts file
hubspot_contacts = connectx.read_csv(**etl.get("read_csv.hubspot_contacts"))
# ------------------------------------------------------------------------------------------------------------------------------------------
# TRANSFORM
# Add column for email validation
read_contacts = read_contacts.with_columns(pl.col("Email").map_elements(connectx.valid_email, pl.Boolean).alias("is_valid_email"))
# Separate records with invalid email
invalid_email_df = (
read_contacts.filter(~pl.col("is_valid_email"))
.drop("is_valid_email")
.with_columns(
# Create error column
( pl.lit("invalid email address").alias("etl_error") )
)
)
valid_email_df = read_contacts.filter(pl.col("is_valid_email")).drop("is_valid_email")
# Find records with duplicated emails
duplicated_emails = valid_email_df.with_columns(valid_email_df.select(
pl.col("Email").is_duplicated().alias("email_duplicated"))
.filter(pl.col("email_duplicated"))
.drop(["email_duplicated"]))
# Find records with duplicated customer code
duplicated_key = duplicated_emails.with_columns(duplicated_emails.select(
pl.col("Codice Cliente Embyon")).is_duplicated().alias("key_duplicated")) \
.filter(pl.col("key_duplicated")) \
.drop(["key_duplicated"])
# Find records with non-duplicated email and customer code
valid_not_duplicated_key = duplicated_emails.join(duplicated_key, on="Email", how="anti").unique("Email", keep="none")
# Find records with duplicated email and non-duplicated code
invalid_not_duplicated_key = duplicated_emails \
.join(duplicated_key, on="Email", how="anti") \
.join(valid_not_duplicated_key, on="Email", how="anti").with_columns(
# Create error column
( pl.lit("mail duplicated with multi cod embyon").alias("etl_error") )
)
# Merge invalid records
invalid_df = pl.concat([invalid_email_df, invalid_not_duplicated_key])
# DataFrame with valid records
final_valid_df = valid_email_df.join(invalid_df, on="Email", how="anti").unique("Email", keep="last")
# ------------------------------------------------------------------------------------------------------------------------------------------
# LOAD
# Update if customer code is equal or empty
update1 = final_valid_df.join(hubspot_contacts, left_on="Email", right_on="email") \
.filter((pl.col("Codice Cliente Embyon") == pl.col("codice_embyon")) | (pl.col("codice_embyon") == "")) \
.drop("codice_embyon")
# Update if customer code is equal and email does not exist in HubSpot
update2 = final_valid_df.join(hubspot_contacts, left_on="Codice Cliente Embyon", right_on="codice_embyon", how="semi")
update = pl.concat([update1, update2]).unique()
# Insert if no record with the same email is found in HubSpot
insert1 = final_valid_df.join(hubspot_contacts, left_on="Email", right_on="email", how="anti")
# Insert if no record with the same Embyon code is found in HubSpot
insert2 = final_valid_df.join(hubspot_contacts, left_on="Codice Cliente Embyon", right_on="codice_embyon", how="anti")
insert = pl.concat([insert1, insert2]) \
.join(update, on="Email", how="anti") \
.join(update, on="Codice Cliente Embyon", how="anti") \
.unique()
# Other cases are not valid
invalid_load_records = final_valid_df\
.join(pl.concat([insert, update]), on="Email", how="anti")\
.with_columns(
# Create error column
( pl.lit("email with another code embyon already exists").alias("etl_error") )
)
final_invalid_df = pl.concat([invalid_df, invalid_load_records])
# Load DataFrames into files
connectx.write_csv(update, file="data/export/hubspot_update.csv", separator=';')
connectx.write_csv(insert, file="data/export/hubspot_insert.csv", separator=';')
connectx.write_csv(final_invalid_df, file="data/export/etl_error.csv", separator=';')
# ------------------------------------------------------------------------------------------------------------------------------------------
return connectx.catcher()
Repository
Example of a possible solution: