Skip to content

Solution 2

Solution

Example Job result of a possible solution:

import connectx
from connectx import Etl
import polars as pl


def run(etl: Etl):

    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   EXTRACT

        # Read contacts file
        read_contacts = connectx.read_csv(**etl.get("read_csv.contacts"))

        # Read HubSpot contacts file
        hubspot_contacts = connectx.read_csv(**etl.get("read_csv.hubspot_contacts"))

    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   TRANSFORM

        # Add column for email validation
        read_contacts = read_contacts.with_columns(pl.col("Email").map_elements(connectx.valid_email, pl.Boolean).alias("is_valid_email"))

        # Separate records with invalid email
        invalid_email_df = (
            read_contacts.filter(~pl.col("is_valid_email"))
            .drop("is_valid_email")
            .with_columns(
                # Create error column
                ( pl.lit("invalid email address").alias("etl_error") )
            )
        )
        valid_email_df = read_contacts.filter(pl.col("is_valid_email")).drop("is_valid_email")

        # Find records with duplicated emails
        duplicated_emails = valid_email_df.with_columns(valid_email_df.select(
            pl.col("Email").is_duplicated().alias("email_duplicated"))
            .filter(pl.col("email_duplicated"))
            .drop(["email_duplicated"]))

        # Find records with duplicated customer code
        duplicated_key = duplicated_emails.with_columns(duplicated_emails.select(
            pl.col("Codice Cliente Embyon")).is_duplicated().alias("key_duplicated")) \
            .filter(pl.col("key_duplicated")) \
            .drop(["key_duplicated"])

        # Find records with non-duplicated email and customer code
        valid_not_duplicated_key = duplicated_emails.join(duplicated_key, on="Email", how="anti").unique("Email", keep="none")

        # Find records with duplicated email and non-duplicated code
        invalid_not_duplicated_key = duplicated_emails \
            .join(duplicated_key, on="Email", how="anti") \
            .join(valid_not_duplicated_key, on="Email", how="anti").with_columns(
                # Create error column
                ( pl.lit("mail duplicated with multi cod embyon").alias("etl_error") )
            )

        # Merge invalid records
        invalid_df = pl.concat([invalid_email_df, invalid_not_duplicated_key])

        # DataFrame with valid records
        final_valid_df = valid_email_df.join(invalid_df, on="Email", how="anti").unique("Email", keep="last")

    # ------------------------------------------------------------------------------------------------------------------------------------------
    #   LOAD

        # Update if customer code is equal or empty
        update1 = final_valid_df.join(hubspot_contacts, left_on="Email", right_on="email") \
            .filter((pl.col("Codice Cliente Embyon") == pl.col("codice_embyon")) | (pl.col("codice_embyon") == "")) \
            .drop("codice_embyon")

        # Update if customer code is equal and email does not exist in HubSpot
        update2 = final_valid_df.join(hubspot_contacts, left_on="Codice Cliente Embyon", right_on="codice_embyon", how="semi")
        update = pl.concat([update1, update2]).unique()

        # Insert if no record with the same email is found in HubSpot
        insert1 = final_valid_df.join(hubspot_contacts, left_on="Email", right_on="email", how="anti")

        # Insert if no record with the same Embyon code is found in HubSpot
        insert2 = final_valid_df.join(hubspot_contacts, left_on="Codice Cliente Embyon", right_on="codice_embyon", how="anti")
        insert = pl.concat([insert1, insert2]) \
            .join(update, on="Email", how="anti") \
            .join(update, on="Codice Cliente Embyon", how="anti") \
            .unique()

        # Other cases are not valid
        invalid_load_records = final_valid_df\
            .join(pl.concat([insert, update]), on="Email", how="anti")\
            .with_columns(
                # Create error column
                ( pl.lit("email with another code embyon already exists").alias("etl_error") )
            )

        final_invalid_df = pl.concat([invalid_df, invalid_load_records])

        # Load DataFrames into files
        connectx.write_csv(update, file="data/export/hubspot_update.csv", separator=';')
        connectx.write_csv(insert, file="data/export/hubspot_insert.csv", separator=';')
        connectx.write_csv(final_invalid_df, file="data/export/etl_error.csv", separator=';')

    # ------------------------------------------------------------------------------------------------------------------------------------------
    return connectx.catcher()

Repository

Example of a possible solution:

ConnectX Academy Repository