02:17 a.m. — A €4 999 withdrawal request pops up at an ATM in a country the cardholder has never visited. Three seconds later the transaction is blocked, the customer gets an SMS and the fraud attempt is over. That’s how FinBank launched “3‑Second Shield”: a mixed squad of Data Engineers and analysts that moved fraud scoring inside Snowflake with Snowpark and a Kafka stream.
1️⃣ The Business Challenge
In banking, every second costs money —and reputation. Legacy rules engines took 30 s – 2 min to approve or deny a payment, giving fraudsters time to swipe again. The new target was < 3 s TTL (time‑to‑lock) from the moment a message appears in Kafka to the moment the core system decides.
2️⃣ High‑Level Architecture (visual first)
graph LR
subgraph Real‑time Payment
A[<< ISO‑8583 msg >>] --> B(Kafka topic `transactions`)
end
subgraph Ingest
B -->|Kafka Connector 2.4 +<br/>Snowpipe Streaming| C[(TABLE RAW.TRX_RAW)]
end
C -->|Stream| D>Snowpark UDF<br/>`SCORE_FRAUD()`]
D --> E[(TABLE SECURE.TRX_SCORED)]
E -->|Rule Engine| F{FRAUD?}
F -- Yes --> G[Triggered Task ⇒ BLOCK + SMS]
F -- No --> H[(Ledger)]
3️⃣ Step‑by‑Step with Verified Code + Context
3.1 Kafka → Snowflake Connector
What it’s for: pushing every transaction in real time—no batch files.
name=trx-streaming-sink
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=2
# Kafka
topics=transactions
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# Snowflake
snowflake.url.name=ab12345.eu-west-1.snowflakecomputing.com:443
snowflake.user=PIPE_INGEST
snowflake.role.name=PIPE_ROLE
snowflake.private.key=<single‑line‑PEM‑private‑key>
snowflake.private.key.passphrase=${env:SF_PASSPHRASE}
snowflake.database=RAW
snowflake.schema=PUBLIC
snowflake.topic2table.map=transactions:TRX_RAW
# Snowpipe Streaming
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=true # schema evolution
# Latency vs cost
snowflake.buffer.flush.time=1 # seconds
snowflake.buffer.count.records=10000
How to use it
- Drop the
.properties
file into your Kafka Connect cluster and restart the task. - Check the logs for
Ingestion method: SNOWPIPE_STREAMING
- Produce a test message:
kafka-console-producer --topic transactions …
Confirm it lands in Snowflake in < 2 s:
SELECT * FROM RAW.PUBLIC.TRX_RAW ORDER BY EVENT_TS DESC LIMIT 1;
3.2 Raw Table and Stream
What it’s for: storing transactions as‑is and exposing a delta for Snowpark.
CREATE OR REPLACE TABLE RAW.PUBLIC.TRX_RAW (
TRX_ID STRING,
CARD_ID STRING,
AMOUNT NUMBER(12,2),
MERCHANT_CAT STRING,
GEO_LAT_LON STRING,
EVENT_TS TIMESTAMP_TZ,
RECORD_METADATA VARIANT
);
CREATE OR REPLACE STREAM RAW.PUBLIC.TRX_RAW_STREAM
ON TABLE RAW.PUBLIC.TRX_RAW
SHOW_INITIAL_ROWS = FALSE;
Try it: publish another Kafka message;
SELECT COUNT(*) FROM RAW.PUBLIC.TRX_RAW_STREAM;
should increment.
3.3 Train a Model with Snowpark ML
from snowflake.snowpark import Session
from snowflake.snowpark.ml import GradientBoostingClassifier
session = Session.builder.configs(<creds>).create()
df = session.table("HISTORIC_TRX")
train, test = df.random_split([0.8, 0.2], seed=42)
gbm = GradientBoostingClassifier(
label_col="IS_FRAUD",
input_cols=["AMOUNT","MERCHANT_CAT","GEO_LAT_LON","HOUR_EVENT"]
)
model = gbm.fit(train)
model.save("stage://model_stage/gbm_fraud")
Next step: the ZIP file now lives in @model_stage
—we’ll import it into the UDF.
3.4 Package the Model in a Snowpark Java UDF
// ScoreFraud.java
public class ScoreFraud {
private static FraudModel mdl = FraudModel.load("/tmp/gbm_fraud.zip");
public static double predict(double amount,
String mcc,
String geo,
int hour) {
return mdl.score(amount, mcc, geo, hour);
}
}
Compile and upload:
mvn package # produces target/scorefraud.jar
PUT file://target/scorefraud.jar @model_stage;
Create the UDF:
CREATE OR REPLACE FUNCTION SECURE.SCORE_FRAUD(
AMOUNT DOUBLE, MERCHANT_CAT STRING,
GEO_LAT_LON STRING, HOUR INT)
RETURNS DOUBLE
LANGUAGE JAVA
RUNTIME_VERSION = '11'
IMPORTS = ('@model_stage/scorefraud.jar',
'@model_stage/gbm_fraud.zip')
HANDLER = 'ScoreFraud.predict';
Smoke test
SELECT SECURE.SCORE_FRAUD(120.50,'5411','40.4167,-3.703',14);
Returns a probability between 0 and 1.
3.5 Scoring & Action Pipeline (Tasks)
-- Task 1: score ONLY when new rows exist
CREATE OR REPLACE TASK SECURE.T_SCORE_TRX
WAREHOUSE = ML_WH
SCHEDULE = 'USING CRON * * * * * UTC' -- once per minute
WHEN SYSTEM$STREAM_HAS_DATA('RAW.PUBLIC.TRX_RAW_STREAM')
AS
INSERT INTO SECURE.TRX_SCORED
SELECT
t.*,
SECURE.SCORE_FRAUD(AMOUNT, MERCHANT_CAT, GEO_LAT_LON,
EXTRACT(HOUR FROM EVENT_TS)) AS FRAUD_PROB
FROM RAW.PUBLIC.TRX_RAW_STREAM t;
-- Task 2: block high‑risk transactions
CREATE OR REPLACE TASK SECURE.T_BLOCK_FRAUD
WAREHOUSE = OPS_WH
AFTER SECURE.T_SCORE_TRX
AS
CALL API_BLOCK_TRX(
(SELECT ARRAY_AGG(OBJECT_CONSTRUCT('trx',TRX_ID,'card',CARD_ID))
FROM SECURE.TRX_SCORED
WHERE FRAUD_PROB > 0.9 AND PROCESSED IS NULL)
);
How to use it
- ALTER TASK … RESUME; on both tasks.
- Simulate a suspicious transaction; inspect SECURE.TRX_SCORED.
- Check the API_BLOCK_TRX log: it should hit the core banking API and send the SMS.
Note ▸ Today Snowflake Tasks run at ≥ 1‑minute granularity; most customers observe 3‑10 s wall‑clock latency after the new row arrives. Event‑Driven Tasks (sub‑minute) are in preview—ask Snowflake Support to enable them.
4️⃣ Production Results
Metric | Before | With 3‑Second Shield |
---|---|---|
Fraud → Block latency (p50) | 45 s | 2.7 s |
Net fraud loss (quarter) | €1 M | ‑42 % |
Infra cost | 3 dedicated Spark nodes | 1 XS warehouse + serverless |
“We used to fight fires; now we spot sparks before flames.” — Chief Risk Officer
✅ Tomorrow’s POC Checklist
- Create the transactions topic and push test messages.
- Deploy the connector; confirm ingestion < 2 s.
- Create TRX_RAW + TRX_RAW_STREAM.
- Train or import the model, upload to @model_stage.
- Publish the SCORE_FRAUD UDF.
- Resume the Tasks and test end‑to‑end.
- Add the observability dashboard.
🛠️ Quick Tutorial: “Zero to Alert” in 20 Minutes
- Infra — one VM with Kafka + Kafka Connect, your Snowflake account, an SMS sandbox (Twilio, etc.).
- 10 min — copy the connector config, restart, watch rows land in TRX_RAW.
- 5 min — deploy the sample model and UDF.
- 3 min — start both Tasks.
- 2 min — run kafka-console-producer, push a fraudulent record → receive SMS.
Total: 147 lines of SQL / Java / properties and zero extra servers.
💬 Your Turn
What latency do you need and what hurdles do you face in your bank?
Share your experience—let’s tune this pipeline together.
Hashtags & shout‑out
#FraudDetection #Snowflake #Snowpark #Kafka #DataEngineering #Banking