sources:
  payments.transactions:
    type: kafka
    topic: payments.transactions
    data_format: json
    primary_key: transaction_id
    schema:
      transaction_id: string
      event_time: string
      status: string
      failure_code: string
      failure_message: string
      amount: decimal128
      currency: string
      customer_id: string
      customer_tier: string
      merchant: string
      payment_method: string
      card_country: string
      ip_country: string
      risk_score: int64
      checkout_url: string
      trace_id: string

transforms:
  failed_payment_alerts:
    type: sql
    primary_key: transaction_id
    sql: |
      SELECT *
      FROM payments.transactions
      WHERE status = 'failed'
        AND (
          amount >= 100
          OR risk_score >= 80
          OR failure_code IN ('suspected_fraud', 'card_declined')
        )

  slack_messages:
    type: script
    from: failed_payment_alerts
    language: typescript
    primary_key: id
    schema:
      id: string
      text: string
      blocks: string
    script: |
      function process(input: any) {
        function escapeMrkdwn(value: unknown): string {
          return String(value ?? "")
            .replace(/&/g, "&amp;")
            .replace(/</g, "&lt;")
            .replace(/>/g, "&gt;");
        }

        function money(amount: number, currency: string): string {
          return `${currency} ${amount.toFixed(2)}`;
        }
      
        const highRisk = input.risk_score >= 80;
        const title = highRisk
          ? ":rotating_light: High-risk payment failure"
          : ":warning: Payment failure";

        return {
          id: input.transaction_id,
          text: `${title}: ${input.transaction_id} failed for ${money(input.amount, input.currency)}`,
          blocks: JSON.stringify([
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text:
                  `*${title}*\n` +
                  `Transaction *${escapeMrkdwn(input.transaction_id)}* failed at ${escapeMrkdwn(input.merchant)}.`
              }
            },
            {
              type: "section",
              fields: [
                {
                  type: "mrkdwn",
                  text: `*Amount*\n${escapeMrkdwn(money(input.amount, input.currency))}`
                },
                {
                  type: "mrkdwn",
                  text: `*Failure*\n${escapeMrkdwn(input.failure_code)}`
                },
                {
                  type: "mrkdwn",
                  text: `*Customer*\n${escapeMrkdwn(input.customer_id)} (${escapeMrkdwn(input.customer_tier)})`
                },
                {
                  type: "mrkdwn",
                  text: `*Risk score*\n${input.risk_score}`
                },
                {
                  type: "mrkdwn",
                  text: `*Card / IP country*\n${escapeMrkdwn(input.card_country)} / ${escapeMrkdwn(input.ip_country)}`
                },
                {
                  type: "mrkdwn",
                  text: `*Trace ID*\n${escapeMrkdwn(input.trace_id)}`
                }
              ]
            },
            {
              type: "context",
              elements: [
                {
                  type: "mrkdwn",
                  text: `${escapeMrkdwn(input.failure_message)} | <${input.checkout_url}|Open checkout>`
                }
              ]
            }
          ])
        };
      }

sinks:
   slack.failed_payment_alerts:
     type: webhook
     from: slack_messages
     url: "https://hooks.slack.com/services/<REST_OF_URL>"
     one_row_per_request: true
