diff --git a/sqlserver-cdc-capture/README.md b/sqlserver-cdc-capture/README.md new file mode 100644 index 0000000..cadf944 --- /dev/null +++ b/sqlserver-cdc-capture/README.md @@ -0,0 +1,86 @@ +# SQL Server CDC Capture Demo + +A self-contained SQL Server environment with CDC enabled and a data generator +producing continuous inserts, updates, and deletes against a `sales` table. +Useful for demoing or testing an Estuary SQL Server CDC capture end-to-end. + +## What's included + +- `sqlserver/` — SQL Server 2022 with an init script that enables CDC, creates + the `sales` table, and sets up the `flow_capture` user. +- `datagen/` — Python container that inserts, updates, and deletes rows in + `dbo.sales` once per second. +- `ngrok` — exposes SQL Server publicly over TCP so the hosted Estuary + connector can reach it. +- `flow.yaml` — Estuary capture spec wiring the SQL Server source to a + collection in your tenant. + +## Running it + +Set your ngrok token and start the stack: + +```bash +export NGROK_AUTHTOKEN= +docker compose up -d +``` + +Grab the public host/port from the ngrok dashboard at http://localhost:4040. + +## Creating the Estuary capture + +`flow.yaml` is the spec deployed via [flowctl](https://docs.estuary.dev/concepts/flowctl/). +It defines a SQL Server CDC capture and one binding for the `dbo.sales` table: + +```yaml +captures: + your-prefix/sqlserver-cdc/source-sqlserver: + endpoint: + connector: + image: ghcr.io/estuary/source-sqlserver:v0 + config: + address: : + database: SampleDB + user: flow_capture + password: Secretsecret1 + historyMode: false + bindings: + - resource: + namespace: dbo + stream: sales + target: your-prefix/sqlserver-cdc/dbo/sales +``` + +Before publishing, edit `address` to match the host/port from the ngrok +dashboard, and replace `your-prefix` with your own tenant/prefix everywhere it +appears (in `flow.yaml`, in the imported yamls under `your-prefix/`, and the +directory name itself). + +Then discover, publish, and verify: + +```bash +# (Re-)run discovery to refresh bindings from the source — optional once +# bindings are present. +flowctl discover --source flow.yaml + +# Publish the capture and the generated collection. +flowctl catalog publish --source flow.yaml --auto-approve + +# Check status; first transition should be PENDING → BACKFILLING → OK. +flowctl catalog status your-prefix/sqlserver-cdc/source-sqlserver + +# Peek at a few documents flowing through the collection. +flowctl collections read \ + --collection your-prefix/sqlserver-cdc/dbo/sales \ + --uncommitted | head +``` + +### Credentials + +The `init.sql` script provisions a dedicated CDC user the capture connects as: + +- user: `flow_capture` +- password: `Secretsecret1` +- database: `SampleDB` + +It also grants the permissions the connector needs (`SELECT` on `dbo` and `cdc`, +plus `VIEW DATABASE STATE`) and enables CDC on `dbo.sales`. diff --git a/sqlserver-cdc-capture/datagen/Dockerfile b/sqlserver-cdc-capture/datagen/Dockerfile new file mode 100644 index 0000000..3d2c835 --- /dev/null +++ b/sqlserver-cdc-capture/datagen/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.12-bullseye + +RUN apt-get update && apt-get install -y \ + curl \ + gnupg \ + apt-transport-https \ + ca-certificates \ + unixodbc unixodbc-dev libgss3 odbcinst + +# Add Microsoft SQL Server ODBC Driver 17 for Linux +RUN curl https://packages.microsoft.com/keys/microsoft.asc | tee /etc/apt/trusted.gpg.d/microsoft.asc \ + && curl https://packages.microsoft.com/config/debian/11/prod.list | tee /etc/apt/sources.list.d/mssql-release.list \ + && apt-get update \ + && ACCEPT_EULA=Y apt-get install msodbcsql18 --assume-yes \ + && ACCEPT_EULA=Y apt-get install -y mssql-tools18 + +# Copy and install Python dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip install -r /tmp/requirements.txt + +# Copy application files +COPY . . + +# Set the command to run the application +CMD ["python", "-u", "datagen.py"] diff --git a/sqlserver-cdc-capture/datagen/datagen.py b/sqlserver-cdc-capture/datagen/datagen.py new file mode 100644 index 0000000..d137942 --- /dev/null +++ b/sqlserver-cdc-capture/datagen/datagen.py @@ -0,0 +1,133 @@ +import os +import pyodbc +import time +from faker import Faker +import random + +# Database connection parameters +DB_NAME = os.getenv("DB_NAME", "SampleDB") +DB_USER = os.getenv("DB_USER", "sa") +DB_PASSWORD = os.getenv("DB_PASSWORD", "SuperSecurePassword1") +DB_HOST = os.getenv("DB_HOST", "127.0.0.1") +DB_PORT = os.getenv("DB_PORT", "1433") + +fake = Faker() + + +# Function to generate a new sample sale +def generate_sale(): + product_id = random.randint(1, 100) + customer_id = random.randint(1, 1000) + sale_date = fake.date_time_this_year() + quantity = fake.random_int(min=1, max=10) + unit_price = round(random.uniform(10.0, 100.0), 2) + total_price = round(quantity * unit_price, 2) + return product_id, customer_id, sale_date, quantity, unit_price, total_price + + +# Function to insert a new sale into the database +def insert_sale(conn, sale): + cursor = conn.cursor() + cursor.execute( + "INSERT INTO sales (product_id, customer_id, sale_date, quantity, unit_price, total_price) VALUES (?, ?, ?, ?, ?, ?)", + sale, + ) + conn.commit() + cursor.close() + + +# Function to update a sale in the database +def update_sale(conn, new_sale): + cursor = conn.cursor() + cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()") + row = cursor.fetchone() + if row: + sale_id = row[0] + cursor.execute( + "UPDATE sales SET product_id=?, customer_id=?, sale_date=?, quantity=?, unit_price=?, total_price=? WHERE sale_id=?", + (*new_sale, sale_id), + ) + conn.commit() + print("Updated sale ID", sale_id, "with new data:", new_sale) + else: + print("No sales found in the database, skipping update.") + cursor.close() + + +# Function to delete a sale from the database +def delete_sale(conn): + cursor = conn.cursor() + cursor.execute("SELECT TOP 1 sale_id FROM sales ORDER BY NEWID()") + row = cursor.fetchone() + if row: + sale_id = row[0] + cursor.execute("DELETE FROM sales WHERE sale_id=?", (sale_id,)) + conn.commit() + print("Deleted sale ID", sale_id) + else: + print("No sales found in the database, skipping delete.") + cursor.close() + + +def connect_to_database(conn_str, retries=3, delay=5): + attempt = 0 + while attempt < retries: + try: + conn = pyodbc.connect(conn_str) + print("Connected to the database!") + return conn + except pyodbc.Error as e: + print(f"Connection failed: {e}") + attempt += 1 + if attempt < retries: + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + else: + print("All retries failed. Unable to connect to the database.") + return None + + +def main(): + conn_str = ( + "DRIVER={ODBC Driver 18 for SQL Server};" + f"SERVER={DB_HOST},{DB_PORT};" + f"DATABASE={DB_NAME};" + f"UID={DB_USER};" + f"PWD={DB_PASSWORD};" + f"TrustServerCertificate=yes" + ) + conn = connect_to_database(conn_str) + + if not conn: + print("Unable to connect to the database.") + exit(1) + + print("Connected to the database!") + + # Main loop to continuously insert, update, or delete sales + try: + while True: + action = random.choices( + ["insert", "delete", "update"], weights=[0.7, 0.2, 0.1], k=1 + )[0] + + if action == "insert": + new_sale = generate_sale() + insert_sale(conn, new_sale) + print("Inserted new sale:", new_sale) + elif action == "update": + new_sale = generate_sale() + update_sale(conn, new_sale) + elif action == "delete": + delete_sale(conn) + + time.sleep(1) # Wait for 1 second before the next operation + except KeyboardInterrupt: + print("Process interrupted by user.") + finally: + conn.close() + print("Database connection closed.") + + +if __name__ == "__main__": + main() diff --git a/sqlserver-cdc-capture/datagen/requirements.txt b/sqlserver-cdc-capture/datagen/requirements.txt new file mode 100644 index 0000000..b6d5e46 --- /dev/null +++ b/sqlserver-cdc-capture/datagen/requirements.txt @@ -0,0 +1,2 @@ +Faker==25.1.0 +pyodbc==5.1.0 diff --git a/sqlserver-cdc-capture/docker-compose.yml b/sqlserver-cdc-capture/docker-compose.yml new file mode 100644 index 0000000..de21664 --- /dev/null +++ b/sqlserver-cdc-capture/docker-compose.yml @@ -0,0 +1,45 @@ +services: + + datagen: + build: datagen + container_name: sql-server-datagen + restart: unless-stopped + + environment: + DB_HOST: sql-server + DB_PORT: 1433 + DB_NAME: SampleDB + DB_USER: sa + DB_PASSWORD: "SuperSecurePassword1" + + depends_on: + sql-server: + condition: service_healthy + + sql-server: + container_name: sql-server + build: sqlserver + restart: always + ports: + - "1433:1433" + environment: + SA_PASSWORD: "SuperSecurePassword1" + ACCEPT_EULA: "Y" + MSSQL_AGENT_ENABLED: true + MSSQL_PID: "Developer" + healthcheck: + test: /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P "$$SA_PASSWORD" -C -Q "SELECT 1" || exit 1 + interval: 10s + timeout: 3s + retries: 10 + start_period: 10s + + ngrok: + image: ngrok/ngrok:latest + container_name: sql-server-ngrok + restart: unless-stopped + environment: + NGROK_AUTHTOKEN: ${NGROK_AUTHTOKEN} + command: 'tcp sql-server:1433' + ports: + - 4040:4040 diff --git a/sqlserver-cdc-capture/flow.yaml b/sqlserver-cdc-capture/flow.yaml new file mode 100644 index 0000000..1b7e37c --- /dev/null +++ b/sqlserver-cdc-capture/flow.yaml @@ -0,0 +1,18 @@ +import: +- your-prefix/flow.yaml +captures: + your-prefix/sqlserver-cdc/source-sqlserver: + endpoint: + connector: + image: ghcr.io/estuary/source-sqlserver:v0 + config: + address: 0.tcp.sa.ngrok.io:11059 + database: SampleDB + user: flow_capture + password: Secretsecret1 + historyMode: false + bindings: + - resource: + namespace: dbo + stream: sales + target: your-prefix/sqlserver-cdc/dbo/sales diff --git a/sqlserver-cdc-capture/sqlserver/Dockerfile b/sqlserver-cdc-capture/sqlserver/Dockerfile new file mode 100644 index 0000000..46fecad --- /dev/null +++ b/sqlserver-cdc-capture/sqlserver/Dockerfile @@ -0,0 +1,16 @@ +FROM mcr.microsoft.com/mssql/server:2022-latest + +# Switch to root to create the directory and copy the script +USER root + +# Create directory for initialization scripts +RUN mkdir -p /var/opt/mssql/scripts + +# Copy the initialization script +COPY init.sql /var/opt/mssql/scripts/init.sql + +# Switch back to mssql user +USER mssql + +# Set the entrypoint to run the SQL Server and initialization script +CMD /bin/bash -c '/opt/mssql/bin/sqlservr & sleep 30 && /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P $SA_PASSWORD -C -i /var/opt/mssql/scripts/init.sql && tail -f /dev/null' diff --git a/sqlserver-cdc-capture/sqlserver/init.sql b/sqlserver-cdc-capture/sqlserver/init.sql new file mode 100644 index 0000000..9a6453f --- /dev/null +++ b/sqlserver-cdc-capture/sqlserver/init.sql @@ -0,0 +1,63 @@ +-- Use master database to create a new database +USE master; +GO + +-- Create a new database if it doesn't already exist +IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'SampleDB') +BEGIN + CREATE DATABASE SampleDB; +END +GO + +-- Switch to the new database +USE SampleDB; +GO + +-- Enable CDC for the database if it's not already enabled +IF NOT EXISTS (SELECT name FROM sys.databases WHERE is_cdc_enabled = 1 AND name = N'SampleDB') +BEGIN + EXEC sys.sp_cdc_enable_db; +END +GO + +-- Create login if it doesn't already exist +IF NOT EXISTS (SELECT name FROM sys.server_principals WHERE name = N'flow_capture') +BEGIN + CREATE LOGIN flow_capture WITH PASSWORD = 'Secretsecret1'; +END +GO + +-- Create user if it doesn't already exist +IF NOT EXISTS (SELECT name FROM sys.database_principals WHERE name = N'flow_capture') +BEGIN + CREATE USER flow_capture FOR LOGIN flow_capture; +END +GO + +-- Grant the user permissions on the CDC schema and schemas with data +GRANT SELECT ON SCHEMA::dbo TO flow_capture; +GRANT SELECT ON SCHEMA::cdc TO flow_capture; +GRANT VIEW DATABASE STATE TO flow_capture; +GO + +-- Create the sales table if it doesn't already exist +IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'sales') +BEGIN + CREATE TABLE dbo.sales ( + sale_id INT IDENTITY(1,1) PRIMARY KEY, + product_id INT NOT NULL, + customer_id INT NOT NULL, + sale_date DATETIME NOT NULL, + quantity INT NOT NULL, + unit_price FLOAT NOT NULL, + total_price FLOAT NOT NULL + ); +END +GO + +-- Enable CDC on the sales table if it's not already enabled +IF NOT EXISTS (SELECT * FROM cdc.change_tables WHERE source_object_id = OBJECT_ID('dbo.sales')) +BEGIN + EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'sales', @role_name = 'flow_capture'; +END +GO diff --git a/sqlserver-cdc-capture/your-prefix/flow.yaml b/sqlserver-cdc-capture/your-prefix/flow.yaml new file mode 100644 index 0000000..d15698e --- /dev/null +++ b/sqlserver-cdc-capture/your-prefix/flow.yaml @@ -0,0 +1,2 @@ +import: +- sqlserver-cdc/flow.yaml diff --git a/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/flow.yaml b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/flow.yaml new file mode 100644 index 0000000..d3157ce --- /dev/null +++ b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/flow.yaml @@ -0,0 +1,5 @@ +collections: + your-prefix/sqlserver-cdc/dbo/sales: + schema: sales.schema.yaml + key: + - /sale_id diff --git a/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/sales.schema.yaml b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/sales.schema.yaml new file mode 100644 index 0000000..0ecac3f --- /dev/null +++ b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/dbo/sales.schema.yaml @@ -0,0 +1,97 @@ +$defs: + DboSales: + type: object + required: + - sale_id + $anchor: DboSales + properties: + customer_id: + type: integer + description: '(source type: non-nullable int)' + product_id: + type: integer + description: '(source type: non-nullable int)' + quantity: + type: integer + description: '(source type: non-nullable int)' + sale_date: + type: string + format: date-time + description: '(source type: non-nullable datetime)' + sale_id: + type: integer + description: '(source type: non-nullable int)' + total_price: + type: number + description: '(source type: non-nullable float)' + unit_price: + type: number + description: '(source type: non-nullable float)' +allOf: +- if: + properties: + _meta: + properties: + op: + const: d + then: + reduce: + delete: true + strategy: merge + else: + reduce: + strategy: merge + required: + - _meta + properties: + _meta: + type: object + required: + - op + - source + properties: + before: + $ref: '#DboSales' + description: Record state immediately before this change was applied. + reduce: + strategy: firstWriteWins + op: + enum: + - c + - d + - u + description: 'Change operation type: ''c'' Create/Insert, ''u'' Update, ''d'' Delete.' + source: + properties: + ts_ms: + type: integer + description: Unix timestamp (in millis) at which this event was recorded by the database. + schema: + type: string + description: Database schema (namespace) of the event. + snapshot: + type: boolean + description: Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log. + table: + type: string + description: Database table of the event. + lsn: + type: string + contentEncoding: base64 + description: The LSN at which a CDC event occurred. Only set for CDC events + seqval: + type: string + contentEncoding: base64 + description: Sequence value used to order changes to a row within a transaction. Only set for CDC events + updateMask: + description: A bit mask with a bit corresponding to each captured column identified for the capture instance. Only set for CDC events + type: object + required: + - schema + - table + - lsn + - seqval + reduce: + strategy: merge +- $ref: '#DboSales' +x-infer-schema: true diff --git a/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/flow.yaml b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/flow.yaml new file mode 100644 index 0000000..4d5fbbf --- /dev/null +++ b/sqlserver-cdc-capture/your-prefix/sqlserver-cdc/flow.yaml @@ -0,0 +1,2 @@ +import: +- dbo/flow.yaml