Customer Io
This document describes how to generate Customer.io-compatible output for syncing people (Identify), events (Track), objects (Group), and relationships (Group with userId) via the Snowflake Reverse ETL integration.
Overview
| Sync Type | Model | Source |
|---|---|---|
| Identify (people) | customer_io_persons |
nexus_entities (person) |
| Track (events) | customer_io_events |
nexus_events |
| Group (objects) | customer_io_<object_type> |
nexus_entities (group) |
| Group (relationships) | customer_io_<object>_relationships |
nexus_relationships |
People (Identify Sync)
The customer_io_identify macro generates a table formatted for Customer.io's
Reverse ETL Snowflake integration.
It automatically discovers all trait columns from nexus_entities, applies
proper timestamp formatting, and handles deduplication.
Quick Start
-- models/output/customer_io.sql
{{ config(materialized='table') }}
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='user_id',
anonymous_id_column='segment_anonymous_id',
dedupe_column='email',
filters=[
"email IS NOT NULL",
"status = 'active'"
]
) }}
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
entity_type |
string | 'person' |
Entity type to select from nexus_entities |
user_id_column |
string | 'user_id' |
Column to use as Customer.io userId |
anonymous_id_column |
string | none |
Column to use as anonymousId (optional) |
dedupe_column |
string | 'email' |
Column to deduplicate by (lowercased/trimmed) |
ignore_traits |
list | [] |
Trait column names to exclude from output |
rename_traits |
dict | {} |
Map of original_name -> new_name for renaming |
filters |
list | [] |
Additional WHERE clause conditions |
Output Format
The macro generates a table with the following structure:
| Column | Description |
|---|---|
userId |
Customer.io user identifier (from user_id_column) |
anonymousId |
Anonymous identifier (if anonymous_id_column set) |
timestamp |
When to apply the identify (from _updated_at) |
"trait_name" |
All other columns as quoted traits for Snowflake |
Features
Automatic Column Discovery
The macro uses adapter.get_columns_in_relation() to discover all columns from
nexus_entities at compile time. This means:
- New traits added to your entities are automatically included
- No need to manually list every trait column
- Use
ignore_traitsto exclude columns you don't want
Default Renames for Customer.io Reserved Traits
The macro automatically renames certain columns to match Customer.io's reserved trait names:
| Nexus Column | Customer.io Trait | Description |
|---|---|---|
_created_at |
created_at |
Date the user's account was created |
You can override these defaults via rename_traits.
Timestamp Transformation
Columns with timestamp types (TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ,
DATE, DATETIME) are automatically wrapped with TO_TIMESTAMP_NTZ() for
proper Customer.io formatting.
Deduplication
The macro deduplicates by the dedupe_column (default: email), keeping the
most recently updated record:
QUALIFY ROW_NUMBER() OVER (
PARTITION BY LOWER(TRIM("email"))
ORDER BY "timestamp" DESC NULLS LAST
) = 1
Snowflake Compatibility
All trait columns are quoted ("trait_name") to ensure Snowflake compatibility
with mixed-case and special character column names.
System Columns
The following columns are automatically excluded from trait output:
entity_id- Internal nexus identifierentity_type- Used for filtering, not a trait_processed_at- Internal processing timestamptraits_entity_id- Internal join column
Examples
Basic Usage
Sync all persons with an email:
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='user_id',
filters=["email IS NOT NULL"]
) }}
With Anonymous ID
Include Segment anonymous IDs for anonymous user tracking:
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='user_id',
anonymous_id_column='segment_anonymous_id',
filters=["email IS NOT NULL"]
) }}
Excluding Traits
Exclude internal or debug traits:
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='user_id',
ignore_traits=[
'internal_score',
'debug_flag',
'test_variant'
],
filters=["email IS NOT NULL"]
) }}
Renaming Traits
Rename traits to match Customer.io attribute naming conventions:
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='user_id',
rename_traits={
'first_name': 'firstName',
'last_name': 'lastName',
'phone_number': 'phone'
},
filters=["email IS NOT NULL"]
) }}
Filtering by Customer Journey and Adding Custom Traits
Filter to only paying customers and add customer journey state as a trait:
{{ config(materialized='view') }}
-- Only include persons who have reached "paying customer" state
with paying_customers as (
select distinct entity_id
from {{ ref('high_level_customer_journey') }}
where state_value != 'lead'
),
current_journey_state as (
select
e.email,
hlcj.state_value
from {{ ref('high_level_customer_journey') }} hlcj
inner join {{ ref('nexus_entities') }} e
on hlcj.entity_id = e.entity_id
where hlcj.is_current = true
),
persons as (
{{ nexus.customer_io_identify(
entity_type='person',
user_id_column='email',
anonymous_id_column='segment_anonymous_id',
dedupe_column='email',
ignore_traits=[
'location_lobbie_integration_uuid',
'location_name',
'location_id',
'facebook_pixel_id',
'facebook_access_key',
'google_ads_account_id',
'facebook_account_id'
],
filters=[
"email IS NOT NULL",
"entity_id IN (SELECT entity_id FROM paying_customers)"
]
) }}
),
persons_with_state as (
select
p.*,
cjs.state_value as "customer_journey_state"
from persons p
left join current_journey_state cjs
on p."userId" = cjs.email
)
select * from persons_with_state
order by "timestamp" desc
Setting Up Customer.io Reverse ETL
Follow these steps to sync data from your Snowflake warehouse to Customer.io.
1. Connect to Snowflake
- In Customer.io, go to Data & Integrations > Integrations
- Click Directory tab
- Search for "Snowflake" and select Snowflake (Data in)
- Enter your Snowflake credentials (account, warehouse, database, schema, user, private key)
- Click Connect
2. Create Your Sync
- Go to Integrations > Snowflake > Syncs
- Click Add Sync
- Select your database
- Choose the type of data to sync (usually Identify for People or Group for Groups)
3. Define Your Query
Enter the following query, replacing YOUR_PRODUCTION_SCHEMA with your actual
schema name:
SELECT *
FROM YOUR_PRODUCTION_SCHEMA.customer_io
WHERE TO_TIMESTAMP_NTZ("_updated_at") > TO_TIMESTAMP_NTZ({{last_sync_time}})
4. Test Your Query
- Click Run Query to preview results
- Verify all column names are lowercase
- Ensure you have valid email addresses
- For initial testing, add
LIMIT 10and run in a development environment
5. Enable the Sync
- Set your sync frequency (e.g., every 15 minutes, hourly)
- Click Enable to start syncing
Important Notes
- First sync: On the first sync,
{{last_sync_time}}is0, so all records will be synced - Subsequent syncs: Only records updated since the last sync will be sent
- Updating existing syncs: When modifying an existing sync, the
{{last_sync_time}}filter may cause zero changes to be detected. To resolve this:- Run once without the WHERE clause, OR
- Create a new sync instead of modifying the existing one
Troubleshooting
Empty Output
If the model produces no rows, check:
- The
entity_typefilter matches your data - The
user_id_columnoranonymous_id_columnhas non-null values - Your custom
filtersaren't too restrictive
Missing Columns
If expected traits are missing:
- Verify the column exists in
nexus_entities - Check it's not in the
ignore_traitslist - Ensure it's not a system column
Duplicate Records
If you see duplicates after sync:
- Verify the
dedupe_columnis correct - Check for null values in the dedupe column (these won't deduplicate)
- Consider adding a filter to exclude null dedupe values
Events (Track Sync)
For tracking events in Customer.io, create a model that outputs data formatted for Customer.io's Track sync. Each row represents one event to be tracked against a person.
Required Columns
| Column | Description |
|---|---|
userId |
Email or user identifier (required for Track) |
anonymousId |
Segment anonymous ID (optional, alternative to userId) |
timestamp |
When the event occurred (use TO_TIMESTAMP_NTZ()) |
event |
Event name (e.g. appointment scheduled, payment completed) |
Event Properties
All additional columns become event properties in Customer.io. Common properties:
| Column | Description |
|---|---|
event_type |
Event category (e.g. appointment, transaction) |
value |
Numeric value (e.g. payment amount) |
value_unit |
Currency or unit (e.g. USD) |
event_data_source |
Source system (e.g. lobbie) |
Source-Specific Properties
Include relevant properties from your source system. For example, appointment events might include:
| Column | Description |
|---|---|
appointment_id |
Unique appointment identifier |
appointment_type |
Type of appointment |
appointment_start_datetime |
Scheduled start time |
appointment_end_datetime |
Scheduled end time |
appointment_number |
Nth appointment for this person |
location_id |
Location identifier |
location_name |
Location display name |
Payment events might include:
| Column | Description |
|---|---|
payment_number |
Nth payment for this person |
product_name |
Product or service purchased |
cost |
Payment amount |
contract_id |
Contract ID (for recurring payments) |
is_recurring |
Boolean flag for recurring payment |
payment_date |
Date payment was processed |
Example Model
-- models/output/customer_io/customer_io_events.sql
{{ config(materialized='view') }}
-- Only include events for persons already in customer_io_persons
with customer_io_person_ids as (
select "userId" as email
from {{ ref('customer_io_persons') }}
),
source_events as (
select
e.event_id,
e.occurred_at,
e.event_name,
e.event_type,
e.value,
e.value_unit,
e.source
from {{ ref('nexus_events') }} e
where e.source = 'your_source'
),
person_participants as (
select event_id, entity_id as person_entity_id
from {{ ref('nexus_entity_participants') }}
where entity_type = 'person'
),
persons as (
select e.entity_id, e.email, e.segment_anonymous_id
from {{ ref('nexus_entities') }} e
inner join customer_io_person_ids cip on e.email = cip.email
where e.entity_type = 'person'
),
events_with_person as (
select
se.*,
p.email,
p.segment_anonymous_id
from source_events se
inner join person_participants pp on se.event_id = pp.event_id
inner join persons p on pp.person_entity_id = p.entity_id
where p.email is not null
)
select
email as "userId",
segment_anonymous_id as "anonymousId",
to_timestamp_ntz(occurred_at) as "timestamp",
event_name as "event",
event_type as "event_type",
value as "value",
value_unit as "value_unit",
source as "event_data_source"
from events_with_person
order by "timestamp" desc
Reverse ETL Query
Use this query in Customer.io, replacing YOUR_SCHEMA with your actual schema:
SELECT *
FROM YOUR_SCHEMA.customer_io_events
WHERE TO_TIMESTAMP_NTZ("timestamp") > TO_TIMESTAMP_NTZ({{last_sync_time}})
Best Practices
- Use a Track sync type (not Identify) when setting up the sync in Customer.io
- Filter events to only include persons who exist in your
customer_io_personsmodel to avoid orphaned events - Use
{{last_sync_time}}to avoid duplicate traffic and improve performance - Event syncs only ingest events after
last_sync_time; backfilling requires a separate one-off sync without the WHERE clause - Quote column names (
"columnName") for Snowflake compatibility - Use
TO_TIMESTAMP_NTZ()for all timestamp columns
Objects (Group Sync)
Customer.io supports custom objects that represent non-person entities like companies, locations, accounts, or products. Objects can be related to people, enabling powerful segmentation like "people associated with location X" or "people whose primary account is Y".
Prerequisites: Create Object Type in Customer.io
Before syncing objects, create an object type in Customer.io:
- Go to Data & Integrations → Objects
- Click Create object type
- Configure the object type:
- Name: e.g.,
Location,Account,Company - Singular: e.g.,
location - Plural: e.g.,
locations - ID attribute: e.g.,
location_id
- Name: e.g.,
- Click Create
- Note the Object Type ID (visible in URL or settings) - you'll need this for your dbt model
Required Columns
| Column | Description |
|---|---|
groupId |
Unique identifier for the object (required) |
objectTypeId |
Customer.io object type ID (required, e.g., '1') |
timestamp |
When the object was last updated |
All additional columns become object attributes (traits).
Example: Location Objects
-- models/output/customer_io/customer_io_locations.sql
{{ config(materialized='view') }}
with locations as (
select
entity_id,
location_id,
location_name,
location_lobbie_integration_uuid,
_updated_at,
_created_at
from {{ ref('nexus_entities') }}
where entity_type = 'group'
and location_id is not null
)
select
-- Customer.io Group sync required fields
location_id as "groupId",
'1' as "objectTypeId", -- Your object type ID from Customer.io
to_timestamp_ntz(_updated_at) as "timestamp",
-- Object traits (stored as attributes on the location)
location_name as "location_name",
location_lobbie_integration_uuid as "location_lobbie_integration_uuid",
entity_id as "nexus_entity_id",
to_timestamp_ntz(_created_at) as "created_at"
from locations
order by "timestamp" desc
Reverse ETL Query
SELECT *
FROM YOUR_SCHEMA.customer_io_locations
WHERE TO_TIMESTAMP_NTZ("timestamp") > TO_TIMESTAMP_NTZ({{last_sync_time}})
Customer.io Setup
- Go to Data & Integrations → Integrations → Snowflake
- Click Add Sync
- Select Group as the sync type
- Configure column mappings:
- Object ID:
groupId - Object Type ID:
objectTypeId - User ID: Leave empty (creates objects without relationships)
- Timestamp:
timestamp
- Object ID:
- Map additional columns as object attributes
- Enable the sync
Relationships (Group Sync with userId)
Relationships connect people to objects. When you include a userId in a Group
sync, Customer.io creates (or updates) the relationship between that person and
the object.
Relationship Attributes
Customer.io supports storing attributes on the relationship itself, not just on the object. This is powerful for modeling things like:
- "Is this the person's primary location?"
- "When did they first interact with this location?"
- "What is their role in this account?"
Important: Relationship attributes must be passed as a JSON object in a
column named relationshipAttributes. Regular columns are stored on the object,
not the relationship.
Required Columns
| Column | Description |
|---|---|
groupId |
Object identifier (required) |
objectTypeId |
Customer.io object type ID (required) |
userId |
Person identifier - email (required for relationships) |
timestamp |
When the relationship was last updated |
relationshipAttributes |
JSON object with relationship-specific attributes |
Example: Person-Location Relationships
This model creates relationships between patients and the locations they've
visited, with attributes like is_primary_location and first_interaction_at:
-- models/output/customer_io/customer_io_location_relationships.sql
{{ config(materialized='view') }}
-- Only include relationships for persons in customer_io_persons
with customer_io_person_ids as (
select "userId" as email
from {{ ref('customer_io_persons') }}
),
-- Get all person-location relationships with stats
relationship_stats as (
select
r.relationship_id,
r.entity_a_id as person_id,
r.entity_b_id as group_id,
g.location_id,
g.location_name,
p.email,
r.established_at as first_interaction_at,
r.last_updated_at as last_interaction_at,
r._updated_at
from {{ ref('nexus_relationships') }} r
inner join {{ ref('nexus_entities') }} p
on r.entity_a_id = p.entity_id
and p.entity_type = 'person'
inner join {{ ref('nexus_entities') }} g
on r.entity_b_id = g.entity_id
and g.entity_type = 'group'
inner join customer_io_person_ids cip
on p.email = cip.email
where r.relationship_type = 'membership'
and r.entity_a_type = 'person'
and r.entity_b_type = 'group'
and g.location_id is not null
and p.email is not null
),
-- Rank locations for each person
ranked_relationships as (
select
*,
row_number() over (
partition by person_id
order by last_interaction_at desc
) as rank_by_recency,
row_number() over (
partition by person_id
order by first_interaction_at asc
) as rank_by_first,
count(*) over (partition by person_id) as location_count
from relationship_stats
),
final as (
select
-- Customer.io Group sync required fields
location_id as "groupId",
'1' as "objectTypeId",
email as "userId",
to_timestamp_ntz(_updated_at) as "timestamp",
-- Relationship attributes as JSON object
-- See: https://docs.customer.io/integrations/data-in/connections/reverse-etl/snowflake/#relationship-attributes
object_construct(
'is_primary_location', case when rank_by_recency = 1 then true else false end,
'is_most_recent_location', case when rank_by_recency = 1 then true else false end,
'is_first_location', case when rank_by_first = 1 then true else false end,
'has_multiple_locations', case when location_count > 1 then true else false end,
'location_count', location_count,
'first_interaction_at', to_timestamp_ntz(first_interaction_at),
'last_interaction_at', to_timestamp_ntz(last_interaction_at),
'location_name', location_name
) as "relationshipAttributes"
from ranked_relationships
)
select * from final
order by "timestamp" desc
Example relationshipAttributes Output
{
"is_primary_location": true,
"is_most_recent_location": true,
"is_first_location": false,
"has_multiple_locations": true,
"location_count": 2,
"first_interaction_at": "2025-01-15 10:30:00.000",
"last_interaction_at": "2025-09-20 14:45:00.000",
"location_name": "GameDay Mens Health - Downtown"
}
Reverse ETL Query
SELECT *
FROM YOUR_SCHEMA.customer_io_location_relationships
WHERE TO_TIMESTAMP_NTZ("timestamp") > TO_TIMESTAMP_NTZ({{last_sync_time}})
Customer.io Setup
- Go to Data & Integrations → Integrations → Snowflake
- Click Add Sync
- Select Group as the sync type
- Configure column mappings:
- Object ID:
groupId - Object Type ID:
objectTypeId - User ID:
userId(this creates the relationship!) - Timestamp:
timestamp - Relationship Attributes:
relationshipAttributes
- Object ID:
- Enable the sync
Sync Order
For initial setup, run syncs in this order:
- Persons first (creates people profiles)
- Objects second (creates location/account/etc. objects)
- Relationships third (links people to objects)
- Events can run anytime after Persons
After initial setup, syncs can run in parallel.
Segmentation Use Cases
With relationship attributes synced, you can create segments like:
- Primary location is X: Filter where
is_primary_location = truefor a specific location - Multi-location people: Filter where
has_multiple_locations = true - Lapsed at primary location: Filter where
is_primary_location = trueANDlast_interaction_atis more than 30 days ago - New at location: Filter where
is_first_location = trueANDfirst_interaction_atis within the last 7 days
Defining Relationship Declarations in Nexus
Before you can sync relationships to Customer.io, you need to define
relationship declarations in your source models. Nexus processes these
declarations through its identity resolution pipeline to create the final
nexus_relationships table.
Enable Relationships for Your Source
In your dbt_project.yml, enable relationships for your source:
vars:
nexus:
sources:
your_source:
enabled: true
events: true
entities: ['person', 'group']
relationships: true # Enable relationship processing
Create Relationship Declaration Model
Create a model named {source}_relationship_declarations.sql in your source
folder. This model should output the following columns:
| Column | Description |
|---|---|
relationship_declaration_id |
Unique ID (use nexus.create_nexus_id) |
event_id |
Event that established the relationship |
occurred_at |
When the relationship was established |
entity_a_identifier |
Identifier for entity A (e.g., patient_id) |
entity_a_identifier_type |
Type of identifier (e.g., 'patient_id') |
entity_a_type |
Entity type: 'person' or 'group' |
entity_a_role |
Role in relationship (e.g., 'patient') |
entity_b_identifier |
Identifier for entity B (e.g., location_id) |
entity_b_identifier_type |
Type of identifier (e.g., 'location_id') |
entity_b_type |
Entity type: 'person' or 'group' |
entity_b_role |
Role in relationship (e.g., 'location') |
relationship_type |
Type: 'membership', 'association', etc. |
relationship_direction |
Direction: 'a_to_b', 'b_to_a', 'bidirectional' |
is_active |
Whether relationship is currently active |
source |
Source system name |
Example: Patient-Location Relationships
-- models/sources/lobbie/intermediate/lobbie_patient_location_relationship_declarations.sql
{{ config(
materialized='table',
tags=['event-processing', 'relationships']
) }}
with appointment_relationships as (
select
event_id,
occurred_at,
patient_id,
location_id,
'lobbie' as source
from {{ ref('lobbie_appointment_events') }}
where patient_id is not null
and location_id is not null
),
relationship_declarations as (
select distinct
{{ nexus.create_nexus_id('relationship_declaration',
['event_id', 'patient_id', 'location_id', "'patient'", 'occurred_at'])
}} as relationship_declaration_id,
event_id,
occurred_at,
-- Entity A: Person (patient)
patient_id as entity_a_identifier,
'patient_id' as entity_a_identifier_type,
'person' as entity_a_type,
'patient' as entity_a_role,
-- Entity B: Group (location)
location_id as entity_b_identifier,
'location_id' as entity_b_identifier_type,
'group' as entity_b_type,
'location' as entity_b_role,
-- Relationship metadata
'membership' as relationship_type,
'a_to_b' as relationship_direction,
true as is_active,
source
from appointment_relationships
)
select * from relationship_declarations
Union Model
Create a top-level model that unions all relationship declarations for your source:
-- models/sources/lobbie/lobbie_relationship_declarations.sql
{{ config(
materialized='table',
tags=['event-processing', 'relationship_declarations']
) }}
select
relationship_declaration_id,
event_id,
occurred_at,
entity_a_identifier,
entity_a_identifier_type,
entity_a_type,
entity_a_role,
entity_b_identifier,
entity_b_identifier_type,
entity_b_type,
entity_b_role,
relationship_type,
relationship_direction,
is_active,
source
from {{ ref('lobbie_patient_location_relationship_declarations') }}
-- UNION ALL additional relationship types here
order by occurred_at desc
Run the Pipeline
After creating your relationship declaration models:
- Run
dbt runto build the models - Nexus will process the declarations through its identity resolution pipeline
- Final relationships appear in
nexus_relationships - Use
nexus_relationshipsas the source for your Customer.io relationship model