Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions sqlserver-cdc-capture/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# SQL Server CDC Capture Demo

A self-contained SQL Server environment with CDC enabled and a data generator
producing continuous inserts, updates, and deletes against a `sales` table.
Useful for demoing or testing an Estuary SQL Server CDC capture end-to-end.

## What's included

- `sqlserver/` — SQL Server 2022 with an init script that enables CDC, creates
the `sales` table, sets up the `flow_capture` user, and creates the
`flow_watermarks` table required by the Estuary connector.
- `datagen/` — Python container that inserts, updates, and deletes rows in
`dbo.sales` once per second.
- `ngrok` — exposes SQL Server publicly over TCP so the hosted Estuary
connector can reach it.
- `flow.yaml` — Estuary capture spec wiring the SQL Server source to a
collection in your tenant.

## Running it

Set your ngrok token and start the stack:

```bash
export NGROK_AUTHTOKEN=<your-token>
docker compose up -d
```

Grab the public host/port from the ngrok dashboard at http://localhost:4040.

## Creating the Estuary capture

`flow.yaml` is the spec deployed via [flowctl](https://docs.estuary.dev/concepts/flowctl/).
It defines a SQL Server CDC capture and one binding for the `dbo.sales` table:

```yaml
captures:
dani-demo/sqlserver-cdc/source-sqlserver:
endpoint:
connector:
image: ghcr.io/estuary/source-sqlserver:v0
config:
address: <ngrok-host>:<ngrok-port>
database: SampleDB
user: flow_capture
password: Secretsecret1
historyMode: false
bindings:
- resource:
namespace: dbo
stream: sales
target: dani-demo/sqlserver-cdc/dbo/sales
```

Before publishing, edit `address` to match the host/port from the ngrok
dashboard, and replace the `dani-demo/sqlserver-cdc/...` prefix with your own
tenant/prefix.

Then discover, publish, and verify:

```bash
# (Re-)run discovery to refresh bindings from the source — optional once
# bindings are present.
flowctl raw discover --source flow.yaml
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flowctl discover is now a top-level command, not just nested under raw.


# Publish the capture and the generated collection.
flowctl catalog publish --source flow.yaml --auto-approve

# Check status; first transition should be PENDING → BACKFILLING → OK.
flowctl catalog status dani-demo/sqlserver-cdc/source-sqlserver

# Peek at a few documents flowing through the collection.
flowctl collections read \
--collection dani-demo/sqlserver-cdc/dbo/sales \
--uncommitted | head
```

### Credentials

The `init.sql` script provisions a dedicated CDC user the capture connects as:

- user: `flow_capture`
- password: `Secretsecret1`
- database: `SampleDB`

It also grants the permissions the connector needs (`SELECT` on `dbo` and `cdc`,
plus `VIEW DATABASE STATE`) and enables CDC on `dbo.sales` and the
`dbo.flow_watermarks` table.
2 changes: 2 additions & 0 deletions sqlserver-cdc-capture/dani-demo/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import:
- sqlserver-cdc/flow.yaml
5 changes: 5 additions & 0 deletions sqlserver-cdc-capture/dani-demo/sqlserver-cdc/dbo/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
collections:
dani-demo/sqlserver-cdc/dbo/sales:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible nit, since the readme instructions do call it out, but it could be helpful to signpost that the prefix needs to change, like using <your-prefix>/ rather than dani-demo/.

The auto-generated directory structure also includes dani-demo as a folder. Along with the capture flow.yaml, it's not too many places the user needs to hunt down and replace text, but could be simplified.

schema: sales.schema.yaml
key:
- /sale_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
$defs:
DboSales:
type: object
required:
- sale_id
$anchor: DboSales
properties:
customer_id:
type: integer
description: '(source type: non-nullable int)'
product_id:
type: integer
description: '(source type: non-nullable int)'
quantity:
type: integer
description: '(source type: non-nullable int)'
sale_date:
type: string
format: date-time
description: '(source type: non-nullable datetime)'
sale_id:
type: integer
description: '(source type: non-nullable int)'
total_price:
type: number
description: '(source type: non-nullable float)'
unit_price:
type: number
description: '(source type: non-nullable float)'
allOf:
- if:
properties:
_meta:
properties:
op:
const: d
then:
reduce:
delete: true
strategy: merge
else:
reduce:
strategy: merge
required:
- _meta
properties:
_meta:
type: object
required:
- op
- source
properties:
before:
$ref: '#DboSales'
description: Record state immediately before this change was applied.
reduce:
strategy: firstWriteWins
op:
enum:
- c
- d
- u
description: 'Change operation type: ''c'' Create/Insert, ''u'' Update, ''d'' Delete.'
source:
properties:
ts_ms:
type: integer
description: Unix timestamp (in millis) at which this event was recorded by the database.
schema:
type: string
description: Database schema (namespace) of the event.
snapshot:
type: boolean
description: Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log.
table:
type: string
description: Database table of the event.
lsn:
type: string
contentEncoding: base64
description: The LSN at which a CDC event occurred. Only set for CDC events
seqval:
type: string
contentEncoding: base64
description: Sequence value used to order changes to a row within a transaction. Only set for CDC events
updateMask:
description: A bit mask with a bit corresponding to each captured column identified for the capture instance. Only set for CDC events
type: object
required:
- schema
- table
- lsn
- seqval
reduce:
strategy: merge
- $ref: '#DboSales'
x-infer-schema: true
2 changes: 2 additions & 0 deletions sqlserver-cdc-capture/dani-demo/sqlserver-cdc/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import:
- dbo/flow.yaml
25 changes: 25 additions & 0 deletions sqlserver-cdc-capture/datagen/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.12-bullseye

RUN apt-get update && apt-get install -y \
curl \
gnupg \
apt-transport-https \
ca-certificates \
unixodbc unixodbc-dev libgss3 odbcinst

# Add Microsoft SQL Server ODBC Driver 17 for Linux
RUN curl https://packages.microsoft.com/keys/microsoft.asc | tee /etc/apt/trusted.gpg.d/microsoft.asc \
&& curl https://packages.microsoft.com/config/debian/11/prod.list | tee /etc/apt/sources.list.d/mssql-release.list \
&& apt-get update \
&& ACCEPT_EULA=Y apt-get install msodbcsql18 --assume-yes \
&& ACCEPT_EULA=Y apt-get install -y mssql-tools18

# Copy and install Python dependencies
COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt

# Copy application files
COPY . .

# Set the command to run the application
CMD ["python", "-u", "datagen.py"]
133 changes: 133 additions & 0 deletions sqlserver-cdc-capture/datagen/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import os
import pyodbc
import time
from faker import Faker
import random

# Database connection parameters
DB_NAME = os.getenv("DB_NAME", "SampleDB")
DB_USER = os.getenv("DB_USER", "sa")
DB_PASSWORD = os.getenv("DB_PASSWORD", "SuperSecurePassword1")
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
DB_PORT = os.getenv("DB_PORT", "1433")

fake = Faker()


# Function to generate a new sample sale
def generate_sale():
product_id = random.randint(1, 100)
customer_id = random.randint(1, 1000)
sale_date = fake.date_time_this_year()
quantity = fake.random_int(min=1, max=10)
unit_price = round(random.uniform(10.0, 100.0), 2)
total_price = round(quantity * unit_price, 2)
return product_id, customer_id, sale_date, quantity, unit_price, total_price


# Function to insert a new sale into the database
def insert_sale(conn, sale):
cursor = conn.cursor()
cursor.execute(
"INSERT INTO sales (product_id, customer_id, sale_date, quantity, unit_price, total_price) VALUES (?, ?, ?, ?, ?, ?)",
sale,
)
conn.commit()
cursor.close()


# Function to update a sale in the database
def update_sale(conn, new_sale):
cursor = conn.cursor()
cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()")
row = cursor.fetchone()
if row:
sale_id = row[0]
cursor.execute(
"UPDATE sales SET product_id=?, customer_id=?, sale_date=?, quantity=?, unit_price=?, total_price=? WHERE sale_id=?",
(*new_sale, sale_id),
)
conn.commit()
print("Updated sale ID", sale_id, "with new data:", new_sale)
else:
print("No sales found in the database, skipping update.")
cursor.close()


# Function to delete a sale from the database
def delete_sale(conn):
cursor = conn.cursor()
cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()")
row = cursor.fetchone()
if row:
sale_id = row[0]
cursor.execute("DELETE FROM sales WHERE sale_id=?", (sale_id,))
conn.commit()
print("Deleted sale ID", sale_id)
else:
print("No sales found in the database, skipping delete.")
cursor.close()


def connect_to_database(conn_str, retries=3, delay=5):
attempt = 0
while attempt < retries:
try:
conn = pyodbc.connect(conn_str)
print("Connected to the database!")
return conn
except pyodbc.Error as e:
print(f"Connection failed: {e}")
attempt += 1
if attempt < retries:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
print("All retries failed. Unable to connect to the database.")
return None


def main():
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={DB_HOST},{DB_PORT};"
f"DATABASE={DB_NAME};"
f"UID={DB_USER};"
f"PWD={DB_PASSWORD};"
f"TrustServerCertificate=yes"
)
conn = connect_to_database(conn_str)

if not conn:
print("Unable to connect to the database.")
exit(1)

print("Connected to the database!")

# Main loop to continuously insert, update, or delete sales
try:
while True:
action = random.choices(
["insert", "delete", "update"], weights=[0.7, 0.2, 0.1], k=1
)[0]

if action == "insert":
new_sale = generate_sale()
insert_sale(conn, new_sale)
print("Inserted new sale:", new_sale)
elif action == "update":
new_sale = generate_sale()
update_sale(conn, new_sale)
elif action == "delete":
delete_sale(conn)

time.sleep(1) # Wait for 1 second before the next operation
except KeyboardInterrupt:
print("Process interrupted by user.")
finally:
conn.close()
print("Database connection closed.")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions sqlserver-cdc-capture/datagen/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker==25.1.0
pyodbc==5.1.0
Loading