Hands-On Exercise: CSV Data Processing with ConnectX and Polars
Overview
This exercise simulates reading customer data from CSV files and writing it back to CSV files.
The goal is to learn the logic of a ConnectX flow and gain familiarity with Polars.
Note: No API calls to applications are required in this exercise.
You should start from the provided template and ensure all project parameters are set to test values.
Avoid sending error emails to production accounts (e.g., Impresoft Engage support).
Example files for this exercise are available in the shared Drive folder:
Drive Folder
Legend
- HS: HubSpot application
- DF: Polars DataFrame
Objective
- Prepare CSV files with data ready to load into HubSpot.
- Prepare a CSV file with all records that do not meet the criteria for HubSpot loading.
Sources
contact_<data_yyyy_mm_dd>.csv– master file (sent by client via SFTP in a real scenario)hubspot.csv– contains current HubSpot data (no HS connection required)
Destination Files
hubspot_insert.csv– records ready to insert into HShubspot_update.csv– records ready to update in HSetl_error.csv– same structure as input file with an additionaletl_errorcolumn describing the issue (e.g., "invalid email address")
Flow Steps
File Preparation
- Place all source files in the designated directory (see best practices)
- Configure the project correctly
- Create the flow
academy1.pyand call it frommain.py
Extract Phase
- Read all
contact_<data>.csvfiles in order - Store result in
read_contactsDF - Read
hubspot.csv - Store result in
read_hubspotDF
Transform Phase
- Data consistency checks on
read_contacts - Exclude records with empty or invalid emails
- Exclude duplicate emails associated with different
codice_embyon - For duplicate emails with same or empty
codice_embyon, keep the last record -
Excluded records go into separate DS, valid ones into
read_valid_contacts -
Check against HubSpot data (
read_hubspotvsread_valid_contacts) usingemailandcodice_embyon
INSERT
- No matching email or
codice_embyonin HS - Records meeting this criterion go into
load_insert_contactsDF
UPDATE
- Match by both email and
codice_embyon - Match by
codice_embyonwith a new email not present in HS - Match by email with empty
codice_embyonin HS - Records meeting these criteria go into
load_update_contactsDF
SKIP
- All other cases go into a separate DF for skipped records
Load Phase
Create the following export files in the designated directory (see best practices):
hubspot_insert.csv– generated fromload_insert_contactshubspot_update.csv– generated fromload_update_contactsetl_error.csv– concatenate all error DataFrames
Return to MAIN to perform the following operations:
- Move
contacts_<data_yyyy_mm_dd>.csvfiles to a backup directory - Delete files in the backup directory that are older than 30 days
Tips
Use the following ConnectX (core) functions:
connectx.read_csv()connectx.write_csv()connectx.valid_email()connectx.move_and_delete_files_backup()
Use the following Polars functions: