Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions sqlserver-cdc-capture/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SQL Server CDC Capture Demo

A self-contained SQL Server environment with CDC enabled and a data generator
producing continuous inserts, updates, and deletes against a `sales` table.
Useful for demoing or testing an Estuary SQL Server CDC capture end-to-end.

## What's included

- `sqlserver/` — SQL Server 2022 with an init script that enables CDC, creates
the `sales` table, and sets up the `flow_capture` user.
- `datagen/` — Python container that inserts, updates, and deletes rows in
`dbo.sales` once per second.
- `ngrok` — exposes SQL Server publicly over TCP so the hosted Estuary
connector can reach it.
- `flow.yaml` — Estuary capture spec wiring the SQL Server source to a
collection in your tenant.

## Running it

Set your ngrok token and start the stack:

```bash
export NGROK_AUTHTOKEN=<your-token>
docker compose up -d
```

Grab the public host/port from the ngrok dashboard at http://localhost:4040.

## Creating the Estuary capture

`flow.yaml` is the spec deployed via [flowctl](https://docs.estuary.dev/concepts/flowctl/).
It defines a SQL Server CDC capture and one binding for the `dbo.sales` table:

```yaml
captures:
your-prefix/sqlserver-cdc/source-sqlserver:
endpoint:
connector:
image: ghcr.io/estuary/source-sqlserver:v0
config:
address: <ngrok-host>:<ngrok-port>
database: SampleDB
user: flow_capture
password: Secretsecret1
historyMode: false
bindings:
- resource:
namespace: dbo
stream: sales
target: your-prefix/sqlserver-cdc/dbo/sales
```

Before publishing, edit `address` to match the host/port from the ngrok
dashboard, and replace `your-prefix` with your own tenant/prefix everywhere it
appears (in `flow.yaml`, in the imported yamls under `your-prefix/`, and the
directory name itself).

Then discover, publish, and verify:

```bash
# (Re-)run discovery to refresh bindings from the source — optional once
# bindings are present.
flowctl discover --source flow.yaml

# Publish the capture and the generated collection.
flowctl catalog publish --source flow.yaml --auto-approve

# Check status; first transition should be PENDING → BACKFILLING → OK.
flowctl catalog status your-prefix/sqlserver-cdc/source-sqlserver

# Peek at a few documents flowing through the collection.
flowctl collections read \
--collection your-prefix/sqlserver-cdc/dbo/sales \
--uncommitted | head
```

### Credentials

The `init.sql` script provisions a dedicated CDC user the capture connects as:

- user: `flow_capture`
- password: `Secretsecret1`
- database: `SampleDB`

It also grants the permissions the connector needs (`SELECT` on `dbo` and `cdc`,
plus `VIEW DATABASE STATE`) and enables CDC on `dbo.sales`.
25 changes: 25 additions & 0 deletions sqlserver-cdc-capture/datagen/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.12-bullseye

RUN apt-get update && apt-get install -y \
curl \
gnupg \
apt-transport-https \
ca-certificates \
unixodbc unixodbc-dev libgss3 odbcinst

# Add Microsoft SQL Server ODBC Driver 17 for Linux
RUN curl https://packages.microsoft.com/keys/microsoft.asc | tee /etc/apt/trusted.gpg.d/microsoft.asc \
&& curl https://packages.microsoft.com/config/debian/11/prod.list | tee /etc/apt/sources.list.d/mssql-release.list \
&& apt-get update \
&& ACCEPT_EULA=Y apt-get install msodbcsql18 --assume-yes \
&& ACCEPT_EULA=Y apt-get install -y mssql-tools18

# Copy and install Python dependencies
COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt

# Copy application files
COPY . .

# Set the command to run the application
CMD ["python", "-u", "datagen.py"]
133 changes: 133 additions & 0 deletions sqlserver-cdc-capture/datagen/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import os
import pyodbc
import time
from faker import Faker
import random

# Database connection parameters
DB_NAME = os.getenv("DB_NAME", "SampleDB")
DB_USER = os.getenv("DB_USER", "sa")
DB_PASSWORD = os.getenv("DB_PASSWORD", "SuperSecurePassword1")
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
DB_PORT = os.getenv("DB_PORT", "1433")

fake = Faker()


# Function to generate a new sample sale
def generate_sale():
product_id = random.randint(1, 100)
customer_id = random.randint(1, 1000)
sale_date = fake.date_time_this_year()
quantity = fake.random_int(min=1, max=10)
unit_price = round(random.uniform(10.0, 100.0), 2)
total_price = round(quantity * unit_price, 2)
return product_id, customer_id, sale_date, quantity, unit_price, total_price


# Function to insert a new sale into the database
def insert_sale(conn, sale):
cursor = conn.cursor()
cursor.execute(
"INSERT INTO sales (product_id, customer_id, sale_date, quantity, unit_price, total_price) VALUES (?, ?, ?, ?, ?, ?)",
sale,
)
conn.commit()
cursor.close()


# Function to update a sale in the database
def update_sale(conn, new_sale):
cursor = conn.cursor()
cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()")
row = cursor.fetchone()
if row:
sale_id = row[0]
cursor.execute(
"UPDATE sales SET product_id=?, customer_id=?, sale_date=?, quantity=?, unit_price=?, total_price=? WHERE sale_id=?",
(*new_sale, sale_id),
)
conn.commit()
print("Updated sale ID", sale_id, "with new data:", new_sale)
else:
print("No sales found in the database, skipping update.")
cursor.close()


# Function to delete a sale from the database
def delete_sale(conn):
cursor = conn.cursor()
cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()")
row = cursor.fetchone()
if row:
sale_id = row[0]
cursor.execute("DELETE FROM sales WHERE sale_id=?", (sale_id,))
conn.commit()
print("Deleted sale ID", sale_id)
else:
print("No sales found in the database, skipping delete.")
cursor.close()


def connect_to_database(conn_str, retries=3, delay=5):
attempt = 0
while attempt < retries:
try:
conn = pyodbc.connect(conn_str)
print("Connected to the database!")
return conn
except pyodbc.Error as e:
print(f"Connection failed: {e}")
attempt += 1
if attempt < retries:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
print("All retries failed. Unable to connect to the database.")
return None


def main():
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={DB_HOST},{DB_PORT};"
f"DATABASE={DB_NAME};"
f"UID={DB_USER};"
f"PWD={DB_PASSWORD};"
f"TrustServerCertificate=yes"
)
conn = connect_to_database(conn_str)

if not conn:
print("Unable to connect to the database.")
exit(1)

print("Connected to the database!")

# Main loop to continuously insert, update, or delete sales
try:
while True:
action = random.choices(
["insert", "delete", "update"], weights=[0.7, 0.2, 0.1], k=1
)[0]

if action == "insert":
new_sale = generate_sale()
insert_sale(conn, new_sale)
print("Inserted new sale:", new_sale)
elif action == "update":
new_sale = generate_sale()
update_sale(conn, new_sale)
elif action == "delete":
delete_sale(conn)

time.sleep(1) # Wait for 1 second before the next operation
except KeyboardInterrupt:
print("Process interrupted by user.")
finally:
conn.close()
print("Database connection closed.")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions sqlserver-cdc-capture/datagen/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker==25.1.0
pyodbc==5.1.0
45 changes: 45 additions & 0 deletions sqlserver-cdc-capture/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
services:

datagen:
build: datagen
container_name: sql-server-datagen
restart: unless-stopped

environment:
DB_HOST: sql-server
DB_PORT: 1433
DB_NAME: SampleDB
DB_USER: sa
DB_PASSWORD: "SuperSecurePassword1"

depends_on:
sql-server:
condition: service_healthy

sql-server:
container_name: sql-server
build: sqlserver
restart: always
ports:
- "1433:1433"
environment:
SA_PASSWORD: "SuperSecurePassword1"
ACCEPT_EULA: "Y"
MSSQL_AGENT_ENABLED: true
MSSQL_PID: "Developer"
healthcheck:
test: /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "$$SA_PASSWORD" -C -Q "SELECT 1" || exit 1
interval: 10s
timeout: 3s
retries: 10
start_period: 10s

ngrok:
image: ngrok/ngrok:latest
container_name: sql-server-ngrok
restart: unless-stopped
environment:
NGROK_AUTHTOKEN: ${NGROK_AUTHTOKEN}
command: 'tcp sql-server:1433'
ports:
- 4040:4040
18 changes: 18 additions & 0 deletions sqlserver-cdc-capture/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import:
- your-prefix/flow.yaml
captures:
your-prefix/sqlserver-cdc/source-sqlserver:
endpoint:
connector:
image: ghcr.io/estuary/source-sqlserver:v0
config:
address: 0.tcp.sa.ngrok.io:11059
database: SampleDB
user: flow_capture
password: Secretsecret1
historyMode: false
bindings:
- resource:
namespace: dbo
stream: sales
target: your-prefix/sqlserver-cdc/dbo/sales
16 changes: 16 additions & 0 deletions sqlserver-cdc-capture/sqlserver/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM mcr.microsoft.com/mssql/server:2022-latest

# Switch to root to create the directory and copy the script
USER root

# Create directory for initialization scripts
RUN mkdir -p /var/opt/mssql/scripts

# Copy the initialization script
COPY init.sql /var/opt/mssql/scripts/init.sql

# Switch back to mssql user
USER mssql

# Set the entrypoint to run the SQL Server and initialization script
CMD /bin/bash -c '/opt/mssql/bin/sqlservr & sleep 30 && /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P $SA_PASSWORD -C -i /var/opt/mssql/scripts/init.sql && tail -f /dev/null'
63 changes: 63 additions & 0 deletions sqlserver-cdc-capture/sqlserver/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- Use master database to create a new database
USE master;
GO

-- Create a new database if it doesn't already exist
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'SampleDB')
BEGIN
CREATE DATABASE SampleDB;
END
GO

-- Switch to the new database
USE SampleDB;
GO

-- Enable CDC for the database if it's not already enabled
IF NOT EXISTS (SELECT name FROM sys.databases WHERE is_cdc_enabled = 1 AND name = N'SampleDB')
BEGIN
EXEC sys.sp_cdc_enable_db;
END
GO

-- Create login if it doesn't already exist
IF NOT EXISTS (SELECT name FROM sys.server_principals WHERE name = N'flow_capture')
BEGIN
CREATE LOGIN flow_capture WITH PASSWORD = 'Secretsecret1';
END
GO

-- Create user if it doesn't already exist
IF NOT EXISTS (SELECT name FROM sys.database_principals WHERE name = N'flow_capture')
BEGIN
CREATE USER flow_capture FOR LOGIN flow_capture;
END
GO

-- Grant the user permissions on the CDC schema and schemas with data
GRANT SELECT ON SCHEMA::dbo TO flow_capture;
GRANT SELECT ON SCHEMA::cdc TO flow_capture;
GRANT VIEW DATABASE STATE TO flow_capture;
GO

-- Create the sales table if it doesn't already exist
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'sales')
BEGIN
CREATE TABLE dbo.sales (
sale_id INT IDENTITY(1,1) PRIMARY KEY,
product_id INT NOT NULL,
customer_id INT NOT NULL,
sale_date DATETIME NOT NULL,
quantity INT NOT NULL,
unit_price FLOAT NOT NULL,
total_price FLOAT NOT NULL
);
END
GO

-- Enable CDC on the sales table if it's not already enabled
IF NOT EXISTS (SELECT * FROM cdc.change_tables WHERE source_object_id = OBJECT_ID('dbo.sales'))
BEGIN
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'sales', @role_name = 'flow_capture';
END
GO
2 changes: 2 additions & 0 deletions sqlserver-cdc-capture/your-prefix/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import:
- sqlserver-cdc/flow.yaml
Loading