Skip to content

Hands-On Exercise: CSV Data Processing with ConnectX and Polars

Overview

This exercise simulates reading customer data from CSV files and writing it back to CSV files.
The goal is to learn the logic of a ConnectX flow and gain familiarity with Polars.

Note: No API calls to applications are required in this exercise.

You should start from the provided template and ensure all project parameters are set to test values.
Avoid sending error emails to production accounts (e.g., Impresoft Engage support).

Example files for this exercise are available in the shared Drive folder:
Drive Folder

Legend

  • HS: HubSpot application
  • DF: Polars DataFrame

Objective

  • Prepare CSV files with data ready to load into HubSpot.
  • Prepare a CSV file with all records that do not meet the criteria for HubSpot loading.

Sources

  • contact_<data_yyyy_mm_dd>.csv – master file (sent by client via SFTP in a real scenario)
  • hubspot.csv – contains current HubSpot data (no HS connection required)

Destination Files

  • hubspot_insert.csv – records ready to insert into HS
  • hubspot_update.csv – records ready to update in HS
  • etl_error.csv – same structure as input file with an additional etl_error column describing the issue (e.g., "invalid email address")

Flow Steps

File Preparation

  • Place all source files in the designated directory (see best practices)
  • Configure the project correctly
  • Create the flow academy1.py and call it from main.py

Extract Phase

  • Read all contact_<data>.csv files in order
  • Store result in read_contacts DF
  • Read hubspot.csv
  • Store result in read_hubspot DF

Transform Phase

  • Data consistency checks on read_contacts
  • Exclude records with empty or invalid emails
  • Exclude duplicate emails associated with different codice_embyon
  • For duplicate emails with same or empty codice_embyon, keep the last record
  • Excluded records go into separate DS, valid ones into read_valid_contacts

  • Check against HubSpot data (read_hubspot vs read_valid_contacts) using email and codice_embyon

INSERT

  • No matching email or codice_embyon in HS
  • Records meeting this criterion go into load_insert_contacts DF

UPDATE

  • Match by both email and codice_embyon
  • Match by codice_embyon with a new email not present in HS
  • Match by email with empty codice_embyon in HS
  • Records meeting these criteria go into load_update_contacts DF

SKIP

  • All other cases go into a separate DF for skipped records

Load Phase

Create the following export files in the designated directory (see best practices):

  • hubspot_insert.csv – generated from load_insert_contacts
  • hubspot_update.csv – generated from load_update_contacts
  • etl_error.csv – concatenate all error DataFrames

Return to MAIN to perform the following operations:

  • Move contacts_<data_yyyy_mm_dd>.csv files to a backup directory
  • Delete files in the backup directory that are older than 30 days

Tips

Use the following ConnectX (core) functions:

  • connectx.read_csv()
  • connectx.write_csv()
  • connectx.valid_email()
  • connectx.move_and_delete_files_backup()

Use the following Polars functions: