Skip to main content

02:17 a.m. — A €4 999 withdrawal request pops up at an ATM in a country the cardholder has never visited. Three seconds later the transaction is blocked, the customer gets an SMS and the fraud attempt is over. That’s how FinBank launched “3‑Second Shield”: a mixed squad of Data Engineers and analysts that moved fraud scoring inside Snowflake with Snowpark and a Kafka stream.


1️⃣ The Business Challenge

In banking, every second costs money —and reputation. Legacy rules engines took 30 s – 2 min to approve or deny a payment, giving fraudsters time to swipe again. The new target was < 3 s TTL (time‑to‑lock) from the moment a message appears in Kafka to the moment the core system decides.


2️⃣ High‑Level Architecture (visual first)

graph LR
    subgraph Real‑time Payment
        A[<< ISO‑8583 msg >>] --> B(Kafka topic `transactions`)
    end
    subgraph Ingest
        B -->|Kafka Connector 2.4 +<br/>Snowpipe Streaming| C[(TABLE RAW.TRX_RAW)]
    end
    C -->|Stream| D>Snowpark UDF<br/>`SCORE_FRAUD()`]
    D --> E[(TABLE SECURE.TRX_SCORED)]
    E -->|Rule Engine| F{FRAUD?}
    F -- Yes --> G[Triggered Task ⇒ BLOCK + SMS]
    F -- No --> H[(Ledger)]

3️⃣ Step‑by‑Step with Verified Code + Context

3.1 Kafka → Snowflake Connector

What it’s for: pushing every transaction in real time—no batch files.

name=trx-streaming-sink
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=2

# Kafka
topics=transactions
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# Snowflake
snowflake.url.name=ab12345.eu-west-1.snowflakecomputing.com:443
snowflake.user=PIPE_INGEST
snowflake.role.name=PIPE_ROLE
snowflake.private.key=<single‑line‑PEM‑private‑key>
snowflake.private.key.passphrase=${env:SF_PASSPHRASE}

snowflake.database=RAW
snowflake.schema=PUBLIC
snowflake.topic2table.map=transactions:TRX_RAW

# Snowpipe Streaming
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=true        # schema evolution

# Latency vs cost
snowflake.buffer.flush.time=1               # seconds
snowflake.buffer.count.records=10000

How to use it

  1. Drop the .properties file into your Kafka Connect cluster and restart the task.
  2. Check the logs for Ingestion method: SNOWPIPE_STREAMING
  3. Produce a test message:
    kafka-console-producer --topic transactions …
    

    Confirm it lands in Snowflake in < 2 s:

    SELECT * FROM RAW.PUBLIC.TRX_RAW ORDER BY EVENT_TS DESC LIMIT 1;
    

3.2 Raw Table and Stream

What it’s for: storing transactions as‑is and exposing a delta for Snowpark.

CREATE OR REPLACE TABLE RAW.PUBLIC.TRX_RAW (
  TRX_ID         STRING,
  CARD_ID        STRING,
  AMOUNT         NUMBER(12,2),
  MERCHANT_CAT   STRING,
  GEO_LAT_LON    STRING,
  EVENT_TS       TIMESTAMP_TZ,
  RECORD_METADATA VARIANT
);

CREATE OR REPLACE STREAM RAW.PUBLIC.TRX_RAW_STREAM
  ON TABLE RAW.PUBLIC.TRX_RAW
  SHOW_INITIAL_ROWS = FALSE;

Try it: publish another Kafka message; SELECT COUNT(*) FROM RAW.PUBLIC.TRX_RAW_STREAM; should increment.


3.3 Train a Model with Snowpark ML

from snowflake.snowpark import Session
from snowflake.snowpark.ml import GradientBoostingClassifier

session = Session.builder.configs(<creds>).create()

df = session.table("HISTORIC_TRX")
train, test = df.random_split([0.8, 0.2], seed=42)

gbm = GradientBoostingClassifier(
          label_col="IS_FRAUD",
          input_cols=["AMOUNT","MERCHANT_CAT","GEO_LAT_LON","HOUR_EVENT"]
      )
model = gbm.fit(train)
model.save("stage://model_stage/gbm_fraud")

Next step: the ZIP file now lives in @model_stage—we’ll import it into the UDF.


3.4 Package the Model in a Snowpark Java UDF

// ScoreFraud.java
public class ScoreFraud {
  private static FraudModel mdl = FraudModel.load("/tmp/gbm_fraud.zip");
  public static double predict(double amount,
                               String mcc,
                               String geo,
                               int hour) {
      return mdl.score(amount, mcc, geo, hour);
  }
}

Compile and upload:

mvn package                       # produces target/scorefraud.jar
PUT file://target/scorefraud.jar @model_stage;

Create the UDF:

CREATE OR REPLACE FUNCTION SECURE.SCORE_FRAUD(
        AMOUNT DOUBLE, MERCHANT_CAT STRING,
        GEO_LAT_LON STRING, HOUR INT)
RETURNS DOUBLE
LANGUAGE JAVA
RUNTIME_VERSION = '11'
IMPORTS = ('@model_stage/scorefraud.jar',
           '@model_stage/gbm_fraud.zip')
HANDLER = 'ScoreFraud.predict';

Smoke test

SELECT SECURE.SCORE_FRAUD(120.50,'5411','40.4167,-3.703',14);

Returns a probability between 0 and 1.


3.5 Scoring & Action Pipeline (Tasks)

-- Task 1: score ONLY when new rows exist
CREATE OR REPLACE TASK SECURE.T_SCORE_TRX
  WAREHOUSE = ML_WH
  SCHEDULE  = 'USING CRON * * * * * UTC'      -- once per minute
  WHEN      SYSTEM$STREAM_HAS_DATA('RAW.PUBLIC.TRX_RAW_STREAM')
AS
INSERT INTO SECURE.TRX_SCORED
SELECT
  t.*,
  SECURE.SCORE_FRAUD(AMOUNT, MERCHANT_CAT, GEO_LAT_LON,
                     EXTRACT(HOUR FROM EVENT_TS)) AS FRAUD_PROB
FROM RAW.PUBLIC.TRX_RAW_STREAM t;

-- Task 2: block high‑risk transactions
CREATE OR REPLACE TASK SECURE.T_BLOCK_FRAUD
  WAREHOUSE = OPS_WH
  AFTER SECURE.T_SCORE_TRX
AS
CALL API_BLOCK_TRX(
  (SELECT ARRAY_AGG(OBJECT_CONSTRUCT('trx',TRX_ID,'card',CARD_ID))
     FROM SECURE.TRX_SCORED
     WHERE FRAUD_PROB > 0.9 AND PROCESSED IS NULL)
);

How to use it

  1. ALTER TASK … RESUME; on both tasks.
  2. Simulate a suspicious transaction; inspect SECURE.TRX_SCORED.
  3. Check the API_BLOCK_TRX log: it should hit the core banking API and send the SMS.

Note ▸ Today Snowflake Tasks run at ≥ 1‑minute granularity; most customers observe 3‑10 s wall‑clock latency after the new row arrives. Event‑Driven Tasks (sub‑minute) are in preview—ask Snowflake Support to enable them.


4️⃣ Production Results

Metric Before With 3‑Second Shield
Fraud → Block latency (p50) 45 s 2.7 s
Net fraud loss (quarter) €1 M ‑42 %
Infra cost 3 dedicated Spark nodes 1 XS warehouse + serverless

“We used to fight fires; now we spot sparks before flames.” — Chief Risk Officer


✅ Tomorrow’s POC Checklist

  1. Create the transactions topic and push test messages.
  2. Deploy the connector; confirm ingestion < 2 s.
  3. Create TRX_RAW + TRX_RAW_STREAM.
  4. Train or import the model, upload to @model_stage.
  5. Publish the SCORE_FRAUD UDF.
  6. Resume the Tasks and test end‑to‑end.
  7. Add the observability dashboard.

🛠️ Quick Tutorial: “Zero to Alert” in 20 Minutes

  1. Infra — one VM with Kafka + Kafka Connect, your Snowflake account, an SMS sandbox (Twilio, etc.).
  2. 10 min — copy the connector config, restart, watch rows land in TRX_RAW.
  3. 5 min — deploy the sample model and UDF.
  4. 3 min — start both Tasks.
  5. 2 min — run kafka-console-producer, push a fraudulent record → receive SMS.

Total: 147 lines of SQL / Java / properties and zero extra servers.


💬 Your Turn

What latency do you need and what hurdles do you face in your bank?
Share your experience—let’s tune this pipeline together.


Hashtags & shout‑out

#FraudDetection #Snowflake #Snowpark #Kafka #DataEngineering #Banking

Leave a Reply