빅쿼리 데이터폼

Dataform 데이터 파이프라인

목차

  1. Dataform 개요
  2. 주요 개념
  3. 설치 및 설정
  4. 프로젝트 구조
  5. 테이블 정의
  6. 변환 작업
  7. 의존성 관리
  8. 테스트
  9. 버전 관리 및 배포
  10. 모니터링 및 디버깅
  11. 실제 예제
  12. 베스트 프랙티스
  13. 사용 요금 및 비용 최적화
  14. 트러블슈팅

Dataform 개요

Dataform이란?

  • Google Cloud에서 제공하는 데이터 변환 워크플로우 관리 도구
  • SQL 기반으로 복잡한 데이터 파이프라인을 코드로 관리
  • BigQuery와 완전 통합되어 스케일러블한 데이터 변환 지원
  • 버전 관리, 의존성 관리, 테스팅 등 소프트웨어 개발 모범 사례 적용

주요 특징

  • 선언적 SQL 기반: 복잡한 데이터 변환을 간단한 SQL로 정의
  • 의존성 자동 관리: 테이블 간 의존성을 자동으로 파악하여 올바른 순서로 실행
  • 증분 처리: 새로운 데이터만 처리하여 비용 및 시간 최적화
  • 테스트 기능: 데이터 품질 검증을 위한 단위 테스트 지원
  • Git 통합: 소스 코드 버전 관리 및 협업 지원

사용 사례

  • ETL/ELT 파이프라인: 원시 데이터를 분석 가능한 형태로 변환
  • 데이터 웨어하우스 구축: 차원 모델링 및 데이터 마트 생성
  • 데이터 품질 관리: 일관성 있는 데이터 검증 및 정제
  • 리포팅 데이터 준비: 비즈니스 인텔리전스 도구용 데이터 가공

주요 개념

1. Repository (저장소)

# dataform.json
{
  "defaultSchema": "dataform_staging",
  "assertionSchema": "dataform_assertions",
  "defaultDatabase": "my-project",
  "defaultLocation": "US"
}

2. Workflow (워크플로우)

  • 데이터 변환 작업의 실행 단위
  • 의존성에 따른 자동 실행 순서 결정
  • 병렬 실행으로 성능 최적화

3. Actions (액션)

Table

-- tables/dim_customers.sqlx
config {
  type: "table",
  schema: "analytics",
  description: "고객 차원 테이블"
}

SELECT 
  customer_id,
  customer_name,
  email,
  registration_date,
  last_order_date
FROM ${ref("raw_customers")}
WHERE customer_id IS NOT NULL

View

-- definitions/vw_sales_summary.sqlx
config {
  type: "view",
  description: "매출 요약 뷰"
}

SELECT 
  DATE(order_date) as order_date,
  SUM(total_amount) as daily_sales,
  COUNT(DISTINCT order_id) as order_count
FROM ${ref("fact_orders")}
GROUP BY DATE(order_date)

Incremental Table

-- tables/fact_orders_incremental.sqlx
config {
  type: "incremental",
  uniqueKey: ["order_id"],
  updatePartitionFilter: "order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
}

SELECT 
  order_id,
  customer_id,
  order_date,
  total_amount,
  CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_orders")}

${ when(incremental(), `WHERE order_date >= (SELECT MAX(order_date) FROM ${self()})`) }

Assertion (검증)

-- definitions/assertions/assert_positive_amounts.sqlx
config {
  type: "assertion",
  description: "주문 금액이 음수가 아닌지 확인"
}

SELECT *
FROM ${ref("fact_orders")}
WHERE total_amount < 0

4. 의존성 참조

-- ref() 함수: 동일 프로젝트 내 테이블 참조
SELECT * FROM ${ref("source_table")}

-- resolve() 함수: 외부 테이블 참조  
SELECT * FROM ${resolve("external_dataset.external_table")}

-- self() 함수: 증분 테이블에서 자기 자신 참조
WHERE date > (SELECT MAX(date) FROM ${self()})

설치 및 설정

1. Google Cloud Console에서 Dataform 활성화

API 활성화

gcloud services enable dataform.googleapis.com

IAM 권한 설정

# 서비스 계정 생성
gcloud iam service-accounts create dataform-sa \
    --description="Dataform service account" \
    --display-name="Dataform SA"

# BigQuery 권한 부여
gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:dataform-sa@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:dataform-sa@PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"

2. 로컬 개발 환경 설정

Dataform CLI 설치

npm install -g @dataform/cli

프로젝트 초기화

# 새 프로젝트 생성
dataform init my-dataform-project
cd my-dataform-project

# Google Cloud 인증 설정
gcloud auth application-default login

개발 환경 구성

// .df/profiles.json
{
  "dev": {
    "projectId": "my-project-dev",
    "location": "US"
  },
  "prod": {
    "projectId": "my-project-prod", 
    "location": "US"
  }
}

3. Google Cloud Console에서 저장소 생성

저장소 연결 설정

  1. Dataform 콘솔 접속
  2. “Create repository” 클릭
  3. Git 저장소와 연결 또는 새 저장소 생성
  4. 기본 브랜치 및 설정 구성

프로젝트 구조

표준 디렉토리 구조

dataform-project/
├── .df/
│   └── profiles.json          # 환경별 프로필 설정
├── definitions/               # 모든 정의 파일들
│   ├── staging/              # 스테이징 영역
│   │   ├── stg_orders.sqlx
│   │   └── stg_customers.sqlx
│   ├── marts/                # 데이터 마트
│   │   ├── dim_customers.sqlx
│   │   └── fact_orders.sqlx
│   ├── assertions/           # 데이터 검증
│   │   ├── assert_unique_customers.sqlx
│   │   └── assert_positive_amounts.sqlx
│   └── sources/              # 소스 데이터 정의
├── includes/                 # JavaScript 함수들
│   ├── constants.js
│   └── macros.js
├── dataform.json             # 프로젝트 설정
└── package.json              # Node.js 의존성

파일 명명 규칙

-- 스테이징: stg_[source]_[entity].sqlx
-- definitions/staging/stg_ecommerce_orders.sqlx

-- 차원 테이블: dim_[entity].sqlx  
-- definitions/marts/dim_customers.sqlx

-- 팩트 테이블: fact_[process].sqlx
-- definitions/marts/fact_sales.sqlx

-- 어서션: assert_[condition].sqlx
-- definitions/assertions/assert_unique_order_ids.sqlx

설정 파일 관리

// dataform.json
{
  "defaultSchema": "analytics",
  "assertionSchema": "data_quality",
  "defaultDatabase": "my-bigquery-project",
  "defaultLocation": "US",
  "vars": {
    "start_date": "2023-01-01",
    "source_dataset": "raw_data"
  }
}

테이블 정의

1. 기본 테이블 생성

단순 테이블

-- definitions/marts/dim_products.sqlx
config {
  type: "table",
  schema: "analytics",
  description: "제품 차원 테이블 - 모든 제품 정보를 포함",
  columns: {
    product_id: "제품 고유 식별자",
    product_name: "제품명",
    category: "제품 카테고리",
    price: "제품 가격",
    created_at: "레코드 생성 시간"
  },
  bigquery: {
    partitionBy: "DATE(created_at)",
    clusterBy: ["category", "product_id"]
  }
}

SELECT 
  p.product_id,
  p.product_name,
  c.category_name as category,
  p.price,
  p.created_at,
  CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_products")} p
JOIN ${ref("raw_categories")} c
  ON p.category_id = c.category_id
WHERE p.is_active = true

파티셔닝 및 클러스터링

-- definitions/marts/fact_sales_daily.sqlx
config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(order_date)",
    partitionExpirationDays: 365,
    clusterBy: ["customer_id", "product_id"],
    requirePartitionFilter: true
  }
}

SELECT 
  DATE(order_date) as order_date,
  customer_id,
  product_id,
  SUM(quantity) as total_quantity,
  SUM(total_amount) as total_sales,
  COUNT(DISTINCT order_id) as order_count
FROM ${ref("stg_orders")}
WHERE DATE(order_date) >= '2023-01-01'
GROUP BY 1, 2, 3

2. 증분 테이블 (Incremental)

기본 증분 처리

-- definitions/staging/stg_events_incremental.sqlx
config {
  type: "incremental",
  uniqueKey: ["event_id"],
  bigquery: {
    partitionBy: "DATE(event_timestamp)",
    updatePartitionFilter: "DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)"
  }
}

SELECT 
  event_id,
  user_id,
  event_type,
  event_timestamp,
  event_properties,
  CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_events")}

${ when(incremental(), `
WHERE event_timestamp > (
  SELECT COALESCE(MAX(event_timestamp), TIMESTAMP('1900-01-01'))
  FROM ${self()}
)
`) }

삭제/업데이트 처리 (Delete+Insert)

-- definitions/marts/dim_customers_scd2.sqlx
config {
  type: "incremental",
  uniqueKey: ["customer_id", "effective_date"],
  bigquery: {
    updatePartitionFilter: "effective_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
  }
}

WITH source_data AS (
  SELECT 
    customer_id,
    customer_name,
    email,
    address,
    phone,
    updated_at as effective_date,
    LEAD(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as end_date,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) as rn
  FROM ${ref("stg_customers")}
  ${ when(incremental(), `
  WHERE updated_at > (
    SELECT COALESCE(MAX(effective_date), DATE('1900-01-01'))
    FROM ${self()}
  )
  `) }
)

SELECT 
  customer_id,
  customer_name,
  email,
  address,
  phone,
  effective_date,
  COALESCE(end_date, DATE('9999-12-31')) as end_date,
  CASE WHEN end_date IS NULL THEN true ELSE false END as is_current
FROM source_data

3. 뷰 정의

단순 뷰

-- definitions/marts/vw_customer_metrics.sqlx
config {
  type: "view",
  description: "고객별 주요 지표 뷰"
}

SELECT 
  c.customer_id,
  c.customer_name,
  COUNT(DISTINCT o.order_id) as total_orders,
  SUM(o.total_amount) as total_spent,
  AVG(o.total_amount) as avg_order_value,
  MIN(o.order_date) as first_order_date,
  MAX(o.order_date) as last_order_date,
  DATE_DIFF(CURRENT_DATE(), MAX(o.order_date), DAY) as days_since_last_order
FROM ${ref("dim_customers")} c
LEFT JOIN ${ref("fact_orders")} o
  ON c.customer_id = o.customer_id
GROUP BY 1, 2

물리화된 뷰 (Materialized View)

-- definitions/marts/mv_daily_sales.sqlx
config {
  type: "view",
  materialized: true,
  bigquery: {
    partitionBy: "DATE(order_date)",
    clusterBy: ["region", "category"]
  }
}

SELECT 
  DATE(order_date) as order_date,
  p.category,
  c.region,
  COUNT(DISTINCT o.order_id) as order_count,
  COUNT(DISTINCT o.customer_id) as customer_count,
  SUM(o.total_amount) as total_sales,
  AVG(o.total_amount) as avg_order_value
FROM ${ref("fact_orders")} o
JOIN ${ref("dim_products")} p ON o.product_id = p.product_id
JOIN ${ref("dim_customers")} c ON o.customer_id = c.customer_id
WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY 1, 2, 3

변환 작업

1. JavaScript 매크로 활용

공통 매크로 정의

// includes/macros.js

// 날짜 범위 필터 매크로
function dateFilter(column, days_back = 30) {
  return `${column} >= DATE_SUB(CURRENT_DATE(), INTERVAL ${days_back} DAY)`;
}

// Pivot 매크로
function pivot(column, values, agg_func = 'SUM', agg_column = '*') {
  const pivotColumns = values.map(value => 
    `${agg_func}(CASE WHEN ${column} = '${value}' THEN ${agg_column} END) as ${value}`
  ).join(',\n  ');
  return pivotColumns;
}

// 안전한 나눗셈 매크로
function safeDivide(numerator, denominator) {
  return `SAFE_DIVIDE(${numerator}, ${denominator})`;
}

// 이메일 마스킹 매크로
function maskEmail(email_column) {
  return `
    CONCAT(
      LEFT(${email_column}, 3),
      '***',
      SUBSTR(${email_column}, STRPOS(${email_column}, '@'))
    )
  `;
}

module.exports = {
  dateFilter,
  pivot,
  safeDivide,
  maskEmail
};

매크로 사용 예제

-- definitions/marts/fact_sales_pivot.sqlx
config { type: "table" }

SELECT 
  order_date,
  customer_id,
  ${pivot('payment_method', ['credit_card', 'debit_card', 'cash'], 'SUM', 'total_amount')}
FROM ${ref("fact_orders")}
WHERE ${dateFilter('order_date', 90)}
GROUP BY order_date, customer_id

2. 조건부 로직

환경별 분기 처리

-- definitions/staging/stg_orders.sqlx
config {
  type: "table",
  schema: dataform.projectConfig.vars.target_schema
}

SELECT 
  order_id,
  customer_id,
  order_date,
  total_amount,
  
  -- 개발환경에서는 데이터 샘플링
  ${ when(dataform.projectConfig.vars.environment === 'dev', `
  FROM ${ref("raw_orders")}
  WHERE MOD(ABS(FARM_FINGERPRINT(CAST(order_id AS STRING))), 100) < 10
  `) }
  
  ${ when(dataform.projectConfig.vars.environment !== 'dev', `
  FROM ${ref("raw_orders")}
  `) }

점진적 배포

-- definitions/marts/fact_orders_v2.sqlx
config {
  type: "table",
  disabled: dataform.projectConfig.vars.enable_v2 !== true
}

-- 새로운 버전의 테이블 정의
SELECT 
  order_id,
  customer_id,
  order_date,
  total_amount,
  -- 새 컬럼 추가
  discount_amount,
  tax_amount,
  net_amount
FROM ${ref("stg_orders")}

3. 복잡한 변환 로직

윈도우 함수 활용

-- definitions/marts/customer_cohort_analysis.sqlx
config {
  type: "table",
  description: "고객 코호트 분석 테이블"
}

WITH customer_orders AS (
  SELECT 
    customer_id,
    order_date,
    total_amount,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_number,
    MIN(order_date) OVER (PARTITION BY customer_id) as first_order_date
  FROM ${ref("fact_orders")}
),

cohort_data AS (
  SELECT 
    customer_id,
    DATE_TRUNC(first_order_date, MONTH) as cohort_month,
    DATE_DIFF(DATE_TRUNC(order_date, MONTH), DATE_TRUNC(first_order_date, MONTH), MONTH) as period_number,
    total_amount
  FROM customer_orders
)

SELECT 
  cohort_month,
  period_number,
  COUNT(DISTINCT customer_id) as customers,
  SUM(total_amount) as revenue,
  AVG(total_amount) as avg_revenue_per_customer
FROM cohort_data
GROUP BY cohort_month, period_number
ORDER BY cohort_month, period_number

JSON 데이터 처리

-- definitions/staging/stg_events_parsed.sqlx
config { type: "table" }

SELECT 
  event_id,
  user_id,
  event_timestamp,
  event_type,
  
  -- JSON 속성 파싱
  JSON_EXTRACT_SCALAR(event_properties, '$.page_url') as page_url,
  JSON_EXTRACT_SCALAR(event_properties, '$.referrer') as referrer,
  CAST(JSON_EXTRACT_SCALAR(event_properties, '$.session_duration') AS INT64) as session_duration,
  
  -- 중첩된 JSON 배열 처리
  ARRAY(
    SELECT JSON_EXTRACT_SCALAR(item, '$.product_id')
    FROM UNNEST(JSON_EXTRACT_ARRAY(event_properties, '$.products')) as item
  ) as product_ids,
  
  -- 조건부 JSON 파싱
  CASE event_type
    WHEN 'purchase' THEN CAST(JSON_EXTRACT_SCALAR(event_properties, '$.amount') AS FLOAT64)
    ELSE NULL
  END as purchase_amount

FROM ${ref("raw_events")}
WHERE JSON_EXTRACT_SCALAR(event_properties, '$.valid') = 'true'

의존성 관리

1. 의존성 그래프 이해

ref() 함수를 통한 의존성 정의

-- definitions/marts/customer_lifetime_value.sqlx
config {
  type: "table",
  dependencies: ["dim_customers", "fact_orders"] -- 명시적 의존성 선언
}

SELECT 
  c.customer_id,
  c.customer_name,
  c.acquisition_date,
  
  -- 의존하는 테이블들 참조
  o.total_orders,
  o.total_spent,
  o.avg_order_value,
  
  -- CLV 계산
  o.total_spent * 1.2 as predicted_ltv
  
FROM ${ref("dim_customers")} c
JOIN (
  SELECT 
    customer_id,
    COUNT(*) as total_orders,
    SUM(total_amount) as total_spent,
    AVG(total_amount) as avg_order_value
  FROM ${ref("fact_orders")}
  GROUP BY customer_id
) o ON c.customer_id = o.customer_id

2. 순환 의존성 해결

Pre-hook를 사용한 임시 테이블

-- definitions/staging/stg_customer_metrics.sqlx
config {
  type: "table",
  preOps: [`
    CREATE OR REPLACE TABLE ${self()}_temp AS
    SELECT customer_id, COUNT(*) as order_count
    FROM ${ref("raw_orders")}
    GROUP BY customer_id
  `]
}

SELECT 
  c.*,
  COALESCE(t.order_count, 0) as total_orders
FROM ${ref("raw_customers")} c
LEFT JOIN ${self()}_temp t
  ON c.customer_id = t.customer_id

3. 조건부 실행

특정 조건에서만 실행

-- definitions/marts/weekend_sales_report.sqlx
config {
  type: "table",
  disabled: "${new Date().getDay() !== 0 && new Date().getDay() !== 6}" -- 주말에만 실행
}

SELECT 
  DATE(order_date) as weekend_date,
  SUM(total_amount) as weekend_sales
FROM ${ref("fact_orders")}
WHERE EXTRACT(DAYOFWEEK FROM order_date) IN (1, 7) -- 일요일, 토요일
  AND DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY DATE(order_date)

데이터 존재 여부 확인

// includes/utils.js
function tableHasData(tableName) {
  return `(SELECT COUNT(*) FROM ${tableName}) > 0`;
}

module.exports = { tableHasData };
-- definitions/marts/incremental_report.sqlx
config {
  type: "table",
  disabled: !${tableHasData(ref("stg_daily_updates"))}
}

SELECT * FROM ${ref("stg_daily_updates")}

테스트

1. 데이터 검증 (Assertions)

기본 어서션

-- definitions/assertions/assert_unique_customer_ids.sqlx
config {
  type: "assertion",
  description: "고객 ID가 중복되지 않는지 확인"
}

SELECT customer_id
FROM ${ref("dim_customers")}
GROUP BY customer_id
HAVING COUNT(*) > 1

복잡한 비즈니스 룰 검증

-- definitions/assertions/assert_order_business_rules.sqlx
config {
  type: "assertion",
  description: "주문 데이터 비즈니스 룰 검증"
}

SELECT 
  order_id,
  'negative_amount' as violation_type,
  total_amount
FROM ${ref("fact_orders")}
WHERE total_amount < 0

UNION ALL

SELECT 
  order_id,
  'future_date' as violation_type,
  order_date
FROM ${ref("fact_orders")}
WHERE DATE(order_date) > CURRENT_DATE()

UNION ALL

SELECT 
  order_id,
  'missing_customer' as violation_type,
  customer_id
FROM ${ref("fact_orders")} o
LEFT JOIN ${ref("dim_customers")} c
  ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL

데이터 품질 메트릭

-- definitions/assertions/data_quality_metrics.sqlx
config {
  type: "assertion",
  description: "데이터 품질 임계값 검증"
}

WITH quality_metrics AS (
  SELECT 
    'customers' as table_name,
    COUNT(*) as total_rows,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNTIF(email IS NULL OR email = '') as missing_emails,
    COUNTIF(NOT REGEXP_CONTAINS(email, r'^[^@]+@[^@]+\.[^@]+$')) as invalid_emails
  FROM ${ref("dim_customers")}
)

SELECT *
FROM quality_metrics
WHERE 
  -- 중복률이 5% 초과
  (total_rows - unique_customers) / total_rows > 0.05
  -- 이메일 누락률이 10% 초과  
  OR missing_emails / total_rows > 0.10
  -- 유효하지 않은 이메일이 2% 초과
  OR invalid_emails / total_rows > 0.02

2. 단위 테스트

JavaScript를 사용한 테스트 유틸리티

// includes/test_utils.js

function createTestData(tableName, data) {
  const columns = Object.keys(data[0]).join(', ');
  const values = data.map(row => 
    `(${Object.values(row).map(v => typeof v === 'string' ? `'${v}'` : v).join(', ')})`
  ).join(', ');
  
  return `
    CREATE OR REPLACE TABLE ${tableName} (${columns}) AS
    SELECT * FROM UNNEST([
      ${values}
    ])
  `;
}

function assertEqual(actual, expected, message) {
  return `
    SELECT 
      '${message}' as test_name,
      ${actual} as actual_value,
      ${expected} as expected_value,
      CASE WHEN ${actual} = ${expected} THEN 'PASS' ELSE 'FAIL' END as result
  `;
}

module.exports = { createTestData, assertEqual };

3. 회귀 테스트

데이터 변화 감지

-- definitions/assertions/detect_data_regression.sqlx
config {
  type: "assertion",
  description: "전일 대비 주요 메트릭 변화 감지"
}

WITH current_metrics AS (
  SELECT 
    DATE(CURRENT_DATE()) as metric_date,
    COUNT(DISTINCT customer_id) as active_customers,
    COUNT(DISTINCT order_id) as total_orders,
    SUM(total_amount) as total_revenue
  FROM ${ref("fact_orders")}
  WHERE DATE(order_date) = CURRENT_DATE()
),

previous_metrics AS (
  SELECT 
    DATE(CURRENT_DATE() - 1) as metric_date,
    COUNT(DISTINCT customer_id) as active_customers,
    COUNT(DISTINCT order_id) as total_orders,
    SUM(total_amount) as total_revenue
  FROM ${ref("fact_orders")}
  WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
)

SELECT 
  'regression_detected' as alert_type,
  c.metric_date,
  c.active_customers as current_customers,
  p.active_customers as previous_customers,
  ABS(c.active_customers - p.active_customers) / p.active_customers as customer_change_pct
FROM current_metrics c
CROSS JOIN previous_metrics p
WHERE 
  -- 고객 수가 50% 이상 변화
  ABS(c.active_customers - p.active_customers) / p.active_customers > 0.5
  -- 또는 매출이 70% 이상 변화
  OR ABS(c.total_revenue - p.total_revenue) / p.total_revenue > 0.7

버전 관리 및 배포

1. Git 기반 워크플로우

브랜치 전략

# Feature 브랜치 생성
git checkout -b feature/customer-segmentation

# 개발 및 테스트
dataform compile
dataform test

# 변경사항 커밋
git add .
git commit -m "Add customer segmentation tables"

# Pull Request 생성 후 병합
git checkout main
git pull origin main

환경별 배포

# 개발 환경 배포
dataform run --profile=dev --vars='{environment: "dev"}'

# 스테이징 환경 배포  
dataform run --profile=staging --vars='{environment: "staging"}'

# 프로덕션 배포 (특정 태그만)
dataform run --profile=prod --vars='{environment: "prod"}' --tags=production

2. CI/CD 파이프라인

GitHub Actions 워크플로우

# .github/workflows/dataform.yml
name: Dataform CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    
    - name: Setup Node.js
      uses: actions/setup-node@v2
      with:
        node-version: '16'
        
    - name: Install dependencies
      run: npm install
      
    - name: Compile Dataform
      run: npx dataform compile
      
    - name: Run tests
      run: npx dataform test --profile=ci
      
  deploy-staging:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v2
    
    - name: Deploy to staging
      run: |
        npx dataform run --profile=staging
        
  deploy-production:
    needs: [test, deploy-staging]
    runs-on: ubuntu-latest
    if: github.event_name == 'release'
    steps:
    - uses: actions/checkout@v2
    
    - name: Deploy to production
      run: |
        npx dataform run --profile=prod --tags=production

3. 점진적 배포 전략

Blue-Green 배포

-- definitions/staging/stg_orders_v2.sqlx
config {
  type: "table",
  schema: "staging_v2", -- 새 버전은 별도 스키마에 배포
  disabled: dataform.projectConfig.vars.enable_v2 !== true
}

-- 개선된 스테이징 로직
SELECT 
  order_id,
  customer_id,
  order_date,
  total_amount,
  -- 새로운 데이터 품질 검증
  CASE 
    WHEN total_amount < 0 THEN 0
    ELSE total_amount
  END as cleaned_total_amount
FROM ${ref("raw_orders")}
WHERE order_date >= '2023-01-01'

카나리 배포

// includes/deployment.js
const CANARY_PERCENTAGE = dataform.projectConfig.vars.canary_percentage || 5;

function isCanaryCustomer(customerIdColumn) {
  return `MOD(ABS(FARM_FINGERPRINT(CAST(${customerIdColumn} AS STRING))), 100) < ${CANARY_PERCENTAGE}`;
}

module.exports = { isCanaryCustomer };
-- definitions/marts/fact_orders_canary.sqlx
config {
  type: "table",
  disabled: dataform.projectConfig.vars.enable_canary !== true
}

SELECT *
FROM ${ref("stg_orders_v2")}
WHERE ${isCanaryCustomer("customer_id")}

모니터링 및 디버깅

1. 실행 모니터링

Cloud Logging을 통한 로그 분석

-- 실행 로그 쿼리 예제
SELECT 
  timestamp,
  severity,
  jsonPayload.workflow_id,
  jsonPayload.action_name,
  jsonPayload.status,
  jsonPayload.execution_time_ms
FROM `project.dataset.dataform_logs`
WHERE DATE(timestamp) = CURRENT_DATE()
  AND jsonPayload.status = 'FAILED'
ORDER BY timestamp DESC

알림 설정

# monitoring/alerting-policy.yaml
displayName: "Dataform Workflow Failures"
conditions:
  - displayName: "Workflow failure rate"
    conditionThreshold:
      filter: 'resource.type="dataform_workflow"'
      comparison: COMPARISON_GREATER_THAN
      thresholdValue: 0.1
      duration: 300s
notificationChannels:
  - "projects/PROJECT_ID/notificationChannels/CHANNEL_ID"

2. 성능 최적화

쿼리 성능 분석

-- definitions/monitoring/query_performance_analysis.sqlx
config {
  type: "table",
  description: "Dataform 쿼리 성능 분석"
}

SELECT 
  job_id,
  user_email,
  project_id,
  creation_time,
  start_time,
  end_time,
  TIMESTAMP_DIFF(end_time, start_time, SECOND) as execution_duration_sec,
  total_bytes_processed / 1024 / 1024 / 1024 as gb_processed,
  total_slot_ms / 1000 / 60 as slot_minutes,
  query
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
  AND job_type = 'QUERY'
  AND state = 'DONE'
  AND CONTAINS_SUBSTR(query, 'dataform')
ORDER BY execution_duration_sec DESC
LIMIT 100

리소스 사용량 모니터링

-- definitions/monitoring/resource_usage_daily.sqlx
config {
  type: "incremental",
  uniqueKey: ["usage_date", "project_id"]
}

SELECT 
  DATE(usage_start_time) as usage_date,
  project.id as project_id,
  service.description as service_name,
  sku.description as sku_description,
  SUM(usage.amount) as total_usage,
  SUM(cost) as total_cost_usd,
  currency
FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
WHERE DATE(usage_start_time) >= CURRENT_DATE() - 7
  AND service.description LIKE '%BigQuery%'
${ when(incremental(), `
  AND DATE(usage_start_time) > (SELECT MAX(usage_date) FROM ${self()})
`) }
GROUP BY 1, 2, 3, 4, 7

3. 데이터 린리지 추적

메타데이터 수집

-- definitions/monitoring/data_lineage.sqlx
config {
  type: "table",
  description: "데이터 린리지 메타데이터"
}

WITH table_dependencies AS (
  SELECT 
    'fact_orders' as target_table,
    ['stg_orders', 'dim_customers', 'dim_products'] as source_tables
  UNION ALL
  SELECT 
    'dim_customers',
    ['raw_customers', 'raw_customer_addresses']
  -- ... 다른 테이블 의존성들
)

SELECT 
  target_table,
  source_table,
  'dataform' as transformation_tool,
  CURRENT_TIMESTAMP() as last_updated
FROM table_dependencies
CROSS JOIN UNNEST(source_tables) as source_table

컬럼 레벨 린리지

-- definitions/monitoring/column_lineage.sqlx
config { type: "table" }

SELECT 
  'fact_orders' as target_table,
  'customer_id' as target_column,
  'stg_orders' as source_table,
  'customer_id' as source_column,
  'direct' as transformation_type

UNION ALL

SELECT 
  'fact_orders',
  'total_amount_with_tax',
  'stg_orders', 
  'total_amount',
  'calculated' -- total_amount * (1 + tax_rate)

-- ... 추가 컬럼 린리지 매핑

실제 예제

1. E-commerce 데이터 파이프라인

원시 데이터 스테이징

-- definitions/staging/stg_ecommerce_orders.sqlx
config {
  type: "table",
  description: "E-commerce 주문 데이터 스테이징",
  bigquery: {
    partitionBy: "DATE(order_timestamp)",
    clusterBy: ["customer_id", "status"]
  }
}

SELECT 
  -- 기본 주문 정보
  CAST(order_id AS STRING) as order_id,
  CAST(customer_id AS STRING) as customer_id,
  order_timestamp,
  DATE(order_timestamp) as order_date,
  
  -- 주문 상태 정규화
  CASE status
    WHEN 'completed' THEN 'COMPLETED'
    WHEN 'cancelled' THEN 'CANCELLED'
    WHEN 'pending' THEN 'PENDING'
    WHEN 'shipped' THEN 'SHIPPED'
    ELSE 'UNKNOWN'
  END as order_status,
  
  -- 금액 정보 정제
  CAST(subtotal AS FLOAT64) as subtotal,
  CAST(tax_amount AS FLOAT64) as tax_amount,
  CAST(shipping_cost AS FLOAT64) as shipping_cost,
  CAST(subtotal AS FLOAT64) + CAST(tax_amount AS FLOAT64) + CAST(shipping_cost AS FLOAT64) as total_amount,
  
  -- 할인 정보
  COALESCE(CAST(discount_amount AS FLOAT64), 0) as discount_amount,
  discount_code,
  
  -- 배송 정보
  shipping_address,
  REGEXP_EXTRACT(shipping_address, r', ([A-Z]{2}) \d{5}') as shipping_state,
  
  -- 메타데이터
  created_at,
  updated_at,
  CURRENT_TIMESTAMP() as processed_at

FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_orders')}
WHERE 
  order_timestamp IS NOT NULL
  AND customer_id IS NOT NULL
  AND CAST(subtotal AS FLOAT64) >= 0

주문 상품 스테이징

-- definitions/staging/stg_order_items.sqlx
config {
  type: "incremental",
  uniqueKey: ["order_id", "product_id"],
  bigquery: {
    partitionBy: "DATE(order_timestamp)"
  }
}

SELECT 
  CAST(order_id AS STRING) as order_id,
  CAST(product_id AS STRING) as product_id,
  order_timestamp,
  
  -- 수량 및 가격 정보
  CAST(quantity AS INT64) as quantity,
  CAST(unit_price AS FLOAT64) as unit_price,
  CAST(quantity AS INT64) * CAST(unit_price AS FLOAT64) as line_total,
  
  -- 할인 적용
  COALESCE(CAST(item_discount AS FLOAT64), 0) as item_discount,
  (CAST(quantity AS INT64) * CAST(unit_price AS FLOAT64)) - COALESCE(CAST(item_discount AS FLOAT64), 0) as line_total_after_discount,
  
  CURRENT_TIMESTAMP() as processed_at

FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_order_items')}
WHERE 
  order_id IS NOT NULL 
  AND product_id IS NOT NULL
  AND CAST(quantity AS INT64) > 0
  AND CAST(unit_price AS FLOAT64) >= 0

${ when(incremental(), `
  AND order_timestamp > (
    SELECT COALESCE(MAX(order_timestamp), TIMESTAMP('1900-01-01'))
    FROM ${self()}
  )
`) }

고객 차원 테이블

-- definitions/marts/dim_customers.sqlx
config {
  type: "table",
  description: "고객 차원 테이블 - SCD Type 2",
  bigquery: {
    clusterBy: ["customer_id", "is_active"]
  }
}

WITH customer_history AS (
  SELECT 
    CAST(customer_id AS STRING) as customer_id,
    email,
    first_name,
    last_name,
    phone,
    date_of_birth,
    gender,
    
    -- 주소 정보
    address_line1,
    address_line2,
    city,
    state,
    zip_code,
    country,
    
    -- 계정 상태
    account_status,
    registration_date,
    last_login_date,
    
    -- 변경 이력 관리
    updated_at,
    LAG(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as previous_update,
    LEAD(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as next_update,
    
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) as latest_record
  FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_customers')}
  WHERE customer_id IS NOT NULL
)

SELECT 
  -- 고객 식별자
  customer_id,
  
  -- 개인 정보
  email,
  CONCAT(first_name, ' ', last_name) as full_name,
  first_name,
  last_name,
  phone,
  date_of_birth,
  gender,
  
  -- 연령 계산
  DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) as age,
  CASE 
    WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 25 THEN '18-24'
    WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 35 THEN '25-34'
    WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 45 THEN '35-44'
    WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 55 THEN '45-54'
    ELSE '55+'
  END as age_group,
  
  -- 주소 정보
  address_line1,
  address_line2,
  city,
  state,
  zip_code,
  country,
  
  -- 계정 정보
  account_status,
  registration_date,
  last_login_date,
  
  -- SCD Type 2 컬럼들
  updated_at as effective_date,
  COALESCE(next_update, TIMESTAMP('2999-12-31')) as end_date,
  CASE WHEN latest_record = 1 THEN true ELSE false END as is_active,
  
  -- 메타데이터
  CURRENT_TIMESTAMP() as processed_at

FROM customer_history

제품 차원 테이블

-- definitions/marts/dim_products.sqlx
config {
  type: "table",
  description: "제품 차원 테이블",
  bigquery: {
    clusterBy: ["category_id", "is_active"]
  }
}

SELECT 
  -- 제품 식별자
  CAST(p.product_id AS STRING) as product_id,
  p.product_name,
  p.product_description,
  p.sku,
  
  -- 카테고리 정보
  CAST(p.category_id AS STRING) as category_id,
  c.category_name,
  c.parent_category_id,
  c.category_hierarchy,
  
  -- 브랜드 정보
  CAST(p.brand_id AS STRING) as brand_id,
  b.brand_name,
  
  -- 가격 정보
  CAST(p.unit_price AS FLOAT64) as unit_price,
  CAST(p.cost AS FLOAT64) as unit_cost,
  CAST(p.unit_price AS FLOAT64) - CAST(p.cost AS FLOAT64) as unit_margin,
  SAFE_DIVIDE(CAST(p.unit_price AS FLOAT64) - CAST(p.cost AS FLOAT64), CAST(p.unit_price AS FLOAT64)) as margin_percentage,
  
  -- 제품 속성
  p.color,
  p.size,
  p.weight,
  p.dimensions,
  
  -- 재고 정보
  CAST(p.stock_quantity AS INT64) as current_stock,
  CAST(p.reorder_level AS INT64) as reorder_level,
  CASE 
    WHEN CAST(p.stock_quantity AS INT64) <= CAST(p.reorder_level AS INT64) THEN 'LOW_STOCK'
    WHEN CAST(p.stock_quantity AS INT64) = 0 THEN 'OUT_OF_STOCK'
    ELSE 'IN_STOCK'
  END as stock_status,
  
  -- 상태 정보
  p.is_active,
  p.launch_date,
  p.discontinued_date,
  
  -- 메타데이터
  p.created_at,
  p.updated_at,
  CURRENT_TIMESTAMP() as processed_at

FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_products')} p
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_categories')} c
  ON p.category_id = c.category_id
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_brands')} b
  ON p.brand_id = b.brand_id
WHERE p.product_id IS NOT NULL

주문 팩트 테이블

-- definitions/marts/fact_orders.sqlx
config {
  type: "incremental",
  uniqueKey: ["order_id"],
  bigquery: {
    partitionBy: "order_date",
    clusterBy: ["customer_id", "order_status"]
  }
}

SELECT 
  -- 주문 식별자
  o.order_id,
  
  -- 고객 외래키
  o.customer_id,
  
  -- 날짜 정보
  o.order_timestamp,
  o.order_date,
  EXTRACT(YEAR FROM o.order_date) as order_year,
  EXTRACT(MONTH FROM o.order_date) as order_month,
  EXTRACT(DAYOFWEEK FROM o.order_date) as day_of_week,
  CASE EXTRACT(DAYOFWEEK FROM o.order_date)
    WHEN 1 THEN 'Sunday'
    WHEN 2 THEN 'Monday'
    WHEN 3 THEN 'Tuesday'
    WHEN 4 THEN 'Wednesday'
    WHEN 5 THEN 'Thursday'
    WHEN 6 THEN 'Friday'
    WHEN 7 THEN 'Saturday'
  END as day_name,
  
  -- 주문 상태
  o.order_status,
  
  -- 금액 정보
  o.subtotal,
  o.tax_amount,
  o.shipping_cost,
  o.discount_amount,
  o.total_amount,
  o.total_amount - o.discount_amount as net_amount,
  
  -- 할인 정보
  o.discount_code,
  CASE WHEN o.discount_amount > 0 THEN true ELSE false END as has_discount,
  SAFE_DIVIDE(o.discount_amount, o.subtotal) as discount_percentage,
  
  -- 배송 정보
  o.shipping_address,
  o.shipping_state,
  
  -- 주문 상품 집계
  oi.total_items,
  oi.total_quantity,
  oi.avg_item_price,
  
  -- 고객 정보 (스냅샷)
  c.customer_name,
  c.customer_email,
  c.customer_age_group,
  c.customer_state,
  c.customer_registration_date,
  
  -- 계산된 메트릭
  DATE_DIFF(o.order_date, c.customer_registration_date, DAY) as days_since_registration,
  CASE 
    WHEN DATE_DIFF(o.order_date, c.customer_registration_date, DAY) <= 30 THEN 'NEW_CUSTOMER'
    WHEN DATE_DIFF(o.order_date, c.customer_registration_date, DAY) <= 365 THEN 'RETURNING_CUSTOMER'
    ELSE 'LOYAL_CUSTOMER'
  END as customer_segment,
  
  -- 메타데이터
  o.processed_at

FROM ${ref("stg_ecommerce_orders")} o

-- 주문 상품 집계 조인
LEFT JOIN (
  SELECT 
    order_id,
    COUNT(DISTINCT product_id) as total_items,
    SUM(quantity) as total_quantity,
    AVG(unit_price) as avg_item_price
  FROM ${ref("stg_order_items")}
  GROUP BY order_id
) oi ON o.order_id = oi.order_id

-- 고객 정보 조인 (현재 활성 레코드)
LEFT JOIN (
  SELECT 
    customer_id,
    full_name as customer_name,
    email as customer_email,
    age_group as customer_age_group,
    state as customer_state,
    registration_date as customer_registration_date
  FROM ${ref("dim_customers")}
  WHERE is_active = true
) c ON o.customer_id = c.customer_id

${ when(incremental(), `
WHERE o.order_timestamp > (
  SELECT COALESCE(MAX(order_timestamp), TIMESTAMP('1900-01-01'))
  FROM ${self()}
)
`) }

2. 마케팅 성과 분석

마케팅 캠페인 성과 마트

-- definitions/marts/marketing_campaign_performance.sqlx
config {
  type: "table",
  description: "마케팅 캠페인별 성과 분석",
  bigquery: {
    partitionBy: "DATE(campaign_date)",
    clusterBy: ["campaign_id", "channel"]
  }
}

WITH campaign_metrics AS (
  SELECT 
    c.campaign_id,
    c.campaign_name,
    c.channel,
    c.campaign_type,
    DATE(c.start_date) as campaign_date,
    c.budget_amount,
    
    -- 광고 지출
    SUM(ad.spend_amount) as total_spend,
    SUM(ad.impressions) as total_impressions,
    SUM(ad.clicks) as total_clicks,
    SAFE_DIVIDE(SUM(ad.clicks), SUM(ad.impressions)) as ctr,
    SAFE_DIVIDE(SUM(ad.spend_amount), SUM(ad.clicks)) as cpc,
    
    -- 전환 지표
    COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END) as converted_customers,
    COUNT(DISTINCT o.order_id) as total_orders,
    SUM(o.total_amount) as total_revenue,
    
    -- 계산된 지표
    SAFE_DIVIDE(COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END), 
                COUNT(DISTINCT c.customer_id)) as conversion_rate,
    SAFE_DIVIDE(SUM(o.total_amount), SUM(ad.spend_amount)) as roas,
    SAFE_DIVIDE(SUM(ad.spend_amount), COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END)) as cac
    
  FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.campaigns')} c
  LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.ad_spend')} ad
    ON c.campaign_id = ad.campaign_id
  LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.campaign_customers')} cc
    ON c.campaign_id = cc.campaign_id
  LEFT JOIN ${ref("fact_orders")} o
    ON cc.customer_id = o.customer_id
    AND DATE(o.order_date) BETWEEN DATE(c.start_date) AND DATE(c.end_date)
  
  GROUP BY 1, 2, 3, 4, 5, 6
)

SELECT 
  *,
  -- 성과 등급
  CASE 
    WHEN roas >= 4.0 THEN 'EXCELLENT'
    WHEN roas >= 2.0 THEN 'GOOD' 
    WHEN roas >= 1.0 THEN 'BREAK_EVEN'
    ELSE 'POOR'
  END as performance_grade,
  
  -- 예산 효율성
  SAFE_DIVIDE(total_spend, budget_amount) as budget_utilization,
  budget_amount - total_spend as remaining_budget

FROM campaign_metrics

3. 고객 세분화 및 LTV 분석

고객 세분화 모델

-- definitions/marts/customer_segmentation.sqlx
config {
  type: "table",
  description: "RFM 기반 고객 세분화",
  bigquery: {
    clusterBy: ["segment", "customer_id"]
  }
}

WITH customer_rfm AS (
  SELECT 
    c.customer_id,
    c.customer_name,
    c.registration_date,
    
    -- Recency: 마지막 구매 이후 일수
    DATE_DIFF(CURRENT_DATE(), MAX(o.order_date), DAY) as recency_days,
    
    -- Frequency: 총 주문 횟수
    COUNT(DISTINCT o.order_id) as frequency,
    
    -- Monetary: 총 구매 금액
    SUM(o.total_amount) as monetary,
    
    -- 추가 메트릭
    AVG(o.total_amount) as avg_order_value,
    MIN(o.order_date) as first_order_date,
    MAX(o.order_date) as last_order_date,
    DATE_DIFF(MAX(o.order_date), MIN(o.order_date), DAY) + 1 as customer_lifetime_days
    
  FROM ${ref("dim_customers")} c
  LEFT JOIN ${ref("fact_orders")} o
    ON c.customer_id = o.customer_id
    AND o.order_status = 'COMPLETED'
  WHERE c.is_active = true
  GROUP BY 1, 2, 3
),

rfm_scores AS (
  SELECT 
    *,
    -- RFM 점수 계산 (1-5 척도)
    CASE 
      WHEN recency_days <= 30 THEN 5
      WHEN recency_days <= 90 THEN 4
      WHEN recency_days <= 180 THEN 3
      WHEN recency_days <= 365 THEN 2
      ELSE 1
    END as r_score,
    
    CASE 
      WHEN frequency >= 10 THEN 5
      WHEN frequency >= 5 THEN 4
      WHEN frequency >= 3 THEN 3
      WHEN frequency >= 2 THEN 2
      ELSE 1
    END as f_score,
    
    CASE 
      WHEN monetary >= 1000 THEN 5
      WHEN monetary >= 500 THEN 4
      WHEN monetary >= 250 THEN 3
      WHEN monetary >= 100 THEN 2
      ELSE 1
    END as m_score
    
  FROM customer_rfm
)

SELECT 
  customer_id,
  customer_name,
  registration_date,
  recency_days,
  frequency,
  monetary,
  avg_order_value,
  first_order_date,
  last_order_date,
  customer_lifetime_days,
  
  -- RFM 점수
  r_score,
  f_score, 
  m_score,
  CONCAT(CAST(r_score AS STRING), CAST(f_score AS STRING), CAST(m_score AS STRING)) as rfm_code,
  
  -- 세분화 결과
  CASE 
    WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'CHAMPIONS'
    WHEN r_score >= 3 AND f_score >= 4 AND m_score >= 4 THEN 'LOYAL_CUSTOMERS'
    WHEN r_score >= 4 AND f_score <= 2 AND m_score >= 3 THEN 'POTENTIAL_LOYALISTS'
    WHEN r_score >= 4 AND f_score <= 2 AND m_score <= 2 THEN 'NEW_CUSTOMERS'
    WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'PROMISING'
    WHEN r_score >= 3 AND f_score <= 2 AND m_score <= 2 THEN 'NEED_ATTENTION'
    WHEN r_score <= 2 AND f_score >= 3 AND m_score >= 3 THEN 'AT_RISK'
    WHEN r_score <= 2 AND f_score >= 4 AND m_score >= 4 THEN 'CANT_LOSE_THEM'
    WHEN r_score <= 2 AND f_score <= 2 AND m_score >= 3 THEN 'HIBERNATING'
    ELSE 'LOST'
  END as segment,
  
  -- 예상 생애가치 (간단한 모델)
  CASE 
    WHEN customer_lifetime_days > 0 
    THEN (monetary / customer_lifetime_days) * 365 * 2 -- 향후 2년 예상
    ELSE avg_order_value * 4 -- 신규 고객 추정
  END as predicted_clv,
  
  CURRENT_TIMESTAMP() as processed_at

FROM rfm_scores

베스트 프랙티스

1. 코드 조직화

폴더 구조 모범 사례

definitions/
├── sources/              # 소스 데이터 문서화
│   ├── raw_orders.sql
│   └── raw_customers.sql
├── staging/              # 원시 데이터 정제
│   ├── ecommerce/
│   │   ├── stg_orders.sqlx
│   │   └── stg_customers.sqlx
│   └── marketing/
│       ├── stg_campaigns.sqlx
│       └── stg_ad_spend.sqlx
├── intermediate/         # 중간 변환 단계
│   ├── int_order_enriched.sqlx
│   └── int_customer_metrics.sqlx
├── marts/               # 최종 비즈니스 테이블
│   ├── core/           # 핵심 차원/팩트 테이블
│   │   ├── dim_customers.sqlx
│   │   ├── dim_products.sqlx
│   │   └── fact_orders.sqlx
│   ├── marketing/      # 마케팅 분석용
│   │   ├── customer_segmentation.sqlx
│   │   └── campaign_performance.sqlx
│   └── finance/        # 재무 분석용
│       ├── revenue_analysis.sqlx
│       └── cost_analysis.sqlx
├── assertions/          # 데이터 품질 검증
│   ├── core/
│   │   ├── assert_unique_orders.sqlx
│   │   └── assert_positive_amounts.sqlx
│   └── business_rules/
│       ├── assert_order_logic.sqlx
│       └── assert_customer_consistency.sqlx
└── utils/              # 유틸리티 뷰
    ├── date_spine.sqlx
    └── business_calendar.sqlx

명명 규칙 표준화

-- 파일명 규칙
-- [layer]_[business_area]_[entity].sqlx

-- staging: stg_[source]_[entity].sqlx
-- definitions/staging/ecommerce/stg_shopify_orders.sqlx

-- intermediate: int_[description].sqlx  
-- definitions/intermediate/int_orders_with_customers.sqlx

-- marts: [entity_type]_[entity].sqlx
-- definitions/marts/core/dim_customers.sqlx
-- definitions/marts/core/fact_orders.sqlx

-- assertions: assert_[rule_description].sqlx
-- definitions/assertions/core/assert_unique_customer_ids.sqlx

2. 성능 최적화

파티셔닝 전략

-- 날짜 기반 파티셔닝 (가장 일반적)
config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(created_date)",
    requirePartitionFilter: true,
    partitionExpirationDays: 1095 // 3
  }
}

-- 정수 범위 파티셔닝
config {
  type: "table", 
  bigquery: {
    partitionBy: "RANGE_BUCKET(customer_id, GENERATE_ARRAY(0, 1000000, 1000))"
  }
}

클러스터링 최적화

-- 쿼리 패턴에 맞는 클러스터링
config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(order_date)",
    clusterBy: ["customer_id", "product_category", "region"] // 자주 필터링/조인되는 컬럼 순서로
  }
}

증분 처리 최적화

-- 효율적인 증분 처리 패턴
config {
  type: "incremental",
  uniqueKey: ["order_id"],
  bigquery: {
    partitionBy: "DATE(order_date)",
    updatePartitionFilter: "DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)" // 최근 3일만 업데이트
  }
}

SELECT *
FROM ${ref("source_table")}
${ when(incremental(), `
WHERE 
  updated_at > (SELECT MAX(updated_at) FROM ${self()})
  OR order_id IN (
    SELECT order_id FROM ${ref("updated_orders")} -- 특정 업데이트 목록
  )
`) }

3. 데이터 품질 관리

포괄적인 데이터 검증

-- definitions/assertions/comprehensive_order_validation.sqlx
config {
  type: "assertion",
  description: "주문 데이터 종합 품질 검증"
}

-- 1. 필수 필드 누락 검사
SELECT 'missing_required_fields' as check_type, order_id
FROM ${ref("fact_orders")}
WHERE customer_id IS NULL OR order_date IS NULL OR total_amount IS NULL

UNION ALL

-- 2. 비즈니스 룰 검증
SELECT 'negative_amounts' as check_type, order_id
FROM ${ref("fact_orders")}  
WHERE total_amount < 0 OR subtotal < 0

UNION ALL

-- 3. 참조 무결성 검증
SELECT 'orphaned_orders' as check_type, o.order_id
FROM ${ref("fact_orders")} o
LEFT JOIN ${ref("dim_customers")} c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL

UNION ALL

-- 4. 중복 데이터 검사
SELECT 'duplicate_orders' as check_type, order_id
FROM ${ref("fact_orders")}
GROUP BY order_id
HAVING COUNT(*) > 1

UNION ALL

-- 5. 데이터 분포 이상 감지
SELECT 'suspicious_outliers' as check_type, order_id
FROM ${ref("fact_orders")}
WHERE total_amount > (
  SELECT PERCENTILE_CONT(total_amount, 0.99) OVER()
  FROM ${ref("fact_orders")}
  LIMIT 1
) * 10 -- 99%ile의 10배를 초과하는 경우

데이터 품질 메트릭 대시보드

-- definitions/monitoring/data_quality_dashboard.sqlx
config {
  type: "table",
  description: "일일 데이터 품질 메트릭"
}

WITH quality_checks AS (
  SELECT 
    'customers' as table_name,
    CURRENT_DATE() as check_date,
    COUNT(*) as total_records,
    COUNTIF(customer_id IS NULL) as null_ids,
    COUNTIF(email IS NULL OR email = '') as missing_emails,
    COUNTIF(NOT REGEXP_CONTAINS(email, r'^[^@]+@[^@]+\.[^@]+$')) as invalid_emails,
    COUNT(DISTINCT customer_id) as unique_customers
  FROM ${ref("dim_customers")}
  WHERE is_active = true
  
  UNION ALL
  
  SELECT 
    'orders',
    CURRENT_DATE(),
    COUNT(*),
    COUNTIF(order_id IS NULL),
    COUNTIF(customer_id IS NULL), 
    COUNTIF(total_amount IS NULL),
    COUNT(DISTINCT order_id)
  FROM ${ref("fact_orders")}
  WHERE order_date = CURRENT_DATE()
)

SELECT 
  *,
  -- 품질 점수 계산
  1 - (null_ids + missing_emails + invalid_emails) / total_records as quality_score,
  
  -- 경고 플래그
  CASE 
    WHEN (null_ids + missing_emails + invalid_emails) / total_records > 0.05 THEN 'CRITICAL'
    WHEN (null_ids + missing_emails + invalid_emails) / total_records > 0.01 THEN 'WARNING'
    ELSE 'OK'
  END as quality_status

FROM quality_checks

4. 문서화 및 메타데이터

자세한 테이블 문서화

-- definitions/marts/core/dim_customers.sqlx
config {
  type: "table",
  description: "고객 차원 테이블 - 모든 고객의 마스터 데이터를 포함합니다. SCD Type 2로 구현되어 고객 정보 변경 이력을 추적합니다.",
  columns: {
    customer_id: "고유 고객 식별자 (Primary Key)",
    customer_name: "고객 성명",
    email: "이메일 주소 (고유값, 마케팅 커뮤니케이션에 사용)",
    phone: "전화번호",
    date_of_birth: "생년월일",
    age: "현재 나이 (매일 계산됨)",
    age_group: "연령대 구분 (18-24, 25-34, 35-44, 45-54, 55+)",
    registration_date: "고객 등록일",
    total_orders: "총 주문 횟수",
    total_spent: "총 구매 금액 (USD)",
    avg_order_value: "평균 주문 금액",
    last_order_date: "최근 주문일",
    customer_segment: "고객 세그먼트 (NEW, RETURNING, LOYAL)",
    is_active: "현재 활성 레코드 여부 (SCD Type 2)",
    effective_date: "레코드 유효 시작일",
    end_date: "레코드 유효 종료일 (2999-12-31이면 현재 활성)",
    processed_at: "데이터 처리 타임스탬프"
  },
  bigquery: {
    partitionBy: "DATE(effective_date)",
    clusterBy: ["customer_id", "is_active"],
    labels: {
      team: "data",
      domain: "customer",
      criticality: "high"
    }
  }
}

비즈니스 로직 문서화

// includes/business_rules.js

/**
 * 고객 세분화 로직
 * 
 * 비즈니스 규칙:
 * - NEW: 첫 구매 후 30일 이내
 * - RETURNING: 첫 구매 후 30일 초과, 1년 이내
 * - LOYAL: 첫 구매 후 1년 초과
 * - CHURNED: 최근 구매가 90일 이전
 * 
 * @param {string} registrationDate - 고객 등록일 컬럼명
 * @param {string} lastOrderDate - 최근 주문일 컬럼명
 * @returns {string} CASE문을 포함한 SQL 표현식
 */
function customerSegmentation(registrationDate, lastOrderDate) {
  return `
    CASE 
      WHEN DATE_DIFF(CURRENT_DATE(), ${lastOrderDate}, DAY) > 90 THEN 'CHURNED'
      WHEN DATE_DIFF(CURRENT_DATE(), ${registrationDate}, DAY) <= 30 THEN 'NEW'
      WHEN DATE_DIFF(CURRENT_DATE(), ${registrationDate}, DAY) <= 365 THEN 'RETURNING'
      ELSE 'LOYAL'
    END
  `;
}

module.exports = { customerSegmentation };

사용 요금 및 비용 최적화

1. DataForm 요금 체계

DataForm 서비스 요금

Google Cloud DataForm 자체는 완전 무료로 제공됩니다:

  • DataForm 서비스 이용료: 무료
  • 워크스페이스 생성 및 관리: 무료
  • Git 저장소 연동: 무료
  • 워크플로우 실행 관리: 무료
  • 웹 IDE 사용: 무료

실제 발생 비용

DataForm을 통해 실행되는 쿼리는 BigQuery 요금 체계를 따릅니다:

# 비용 발생 요소
BigQuery 요금:
  - 쿼리 처리 비용 (온디맨드 또는 슬롯 기반)
  - 스토리지 비용
  - 네트워크 비용 (리전 간 이동 시)
  
DataForm 관련:
  - 워크플로우 실행 시 BigQuery 쿼리 비용
  - 테스트 실행 시 BigQuery 비용
  - 어서션 검증 시 BigQuery 비용

2. BigQuery 요금 체계

2.1 쿼리 처리 비용

온디맨드 가격 (2024년 기준):

  • 미국 (멀티리전): $6.00 per TB 스캔된 데이터
  • 기타 지역: $6.60 per TB 스캔된 데이터
-- 예상 비용 계산 예제
-- definitions/monitoring/query_cost_estimation.sqlx
config { type: "table" }

WITH query_analysis AS (
  SELECT 
    job_id,
    user_email,
    creation_time,
    total_bytes_processed / 1024 / 1024 / 1024 / 1024 as tb_processed,
    -- 미국 기준 $6.00/TB
    (total_bytes_processed / 1024 / 1024 / 1024 / 1024) * 6.0 as estimated_cost_usd,
    query
  FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE 
    DATE(creation_time) = CURRENT_DATE()
    AND job_type = 'QUERY'
    AND state = 'DONE'
    AND CONTAINS_SUBSTR(query, 'dataform')
)

SELECT 
  DATE(creation_time) as query_date,
  COUNT(*) as total_dataform_queries,
  ROUND(SUM(tb_processed), 3) as total_tb_processed,
  ROUND(SUM(estimated_cost_usd), 2) as estimated_daily_cost_usd
FROM query_analysis
GROUP BY DATE(creation_time)
ORDER BY query_date DESC

슬롯 기반 가격 (예약 용량):

  • Flex Slots: $0.04 per slot per hour
  • 연간 약정: $1,700 per 100 slots per month
  • 월간 약정: $2,000 per 100 slots per month

2.2 스토리지 비용

  • Active Storage: $0.020 per GB per month
  • Long-term Storage (90일 미사용): $0.010 per GB per month
-- 스토리지 비용 모니터링
-- definitions/monitoring/storage_cost_analysis.sqlx
config { type: "table" }

SELECT 
  table_schema,
  table_name,
  ROUND(size_bytes / 1024 / 1024 / 1024, 2) as size_gb,
  -- Active storage 기준 $0.020/GB/month
  ROUND((size_bytes / 1024 / 1024 / 1024) * 0.020, 2) as monthly_storage_cost_usd,
  last_modified_time,
  CASE 
    WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), last_modified_time, DAY) > 90 
    THEN ROUND((size_bytes / 1024 / 1024 / 1024) * 0.010, 2)
    ELSE ROUND((size_bytes / 1024 / 1024 / 1024) * 0.020, 2)
  END as actual_monthly_cost_usd
FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
WHERE table_schema NOT IN ('INFORMATION_SCHEMA', 'sys')
ORDER BY size_gb DESC

3. DataForm 비용 최적화 전략

3.1 쿼리 최적화를 통한 비용 절감

파티셔닝 활용:

-- 파티션 필터를 통한 스캔 데이터 최소화
config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(order_date)",
    requirePartitionFilter: true
  }
}

SELECT *
FROM ${ref("fact_orders")}
WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) -- 파티션 필터 필수

증분 처리를 통한 비용 절감:

-- 전체 재처리 대신 증분 처리로 비용 90% 절약 가능
config {
  type: "incremental",
  uniqueKey: ["order_id"]
}

SELECT *
FROM ${ref("raw_orders")}
${ when(incremental(), `
WHERE updated_at > (SELECT MAX(updated_at) FROM ${self()})
`) }

컬럼 선택을 통한 스캔 최소화:

-- ❌ 비효율적 - 전체 컬럼 스캔
SELECT * FROM large_table

-- ✅ 효율적 - 필요한 컬럼만 선택
SELECT 
  order_id,
  customer_id,
  total_amount,
  order_date
FROM large_table

3.2 테스트 비용 최적화

-- 테스트용 샘플 데이터 사용으로 비용 절약
-- definitions/staging/stg_orders_sample.sqlx
config {
  type: "table",
  disabled: dataform.projectConfig.vars.environment !== 'test'
}

SELECT *
FROM ${ref("raw_orders")}
WHERE MOD(ABS(FARM_FINGERPRINT(order_id)), 100) < 1 -- 1% 샘플링

3.3 스케줄링을 통한 효율적 실행

// includes/cost_optimization.js

// 업무 시간 외 실행으로 슬롯 경쟁 최소화
function isOffPeakHours() {
  const currentHour = new Date().getHours();
  return currentHour < 8 || currentHour > 18;
}

// 배치 처리로 쿼리 최적화
function shouldRunFullRefresh(tableName) {
  const dayOfWeek = new Date().getDay();
  // 주말에만 전체 갱신 실행
  return dayOfWeek === 0 || dayOfWeek === 6;
}

module.exports = { isOffPeakHours, shouldRunFullRefresh };

4. 비용 모니터링 및 알림

4.1 실시간 비용 추적

-- definitions/monitoring/daily_cost_tracking.sqlx
config {
  type: "incremental",
  uniqueKey: ["cost_date", "project_id"]
}

WITH daily_costs AS (
  SELECT 
    DATE(usage_start_time) as cost_date,
    project.id as project_id,
    service.description as service_name,
    SUM(cost) as total_cost_usd,
    SUM(CASE WHEN sku.description LIKE '%Query%' THEN cost ELSE 0 END) as query_cost_usd,
    SUM(CASE WHEN sku.description LIKE '%Storage%' THEN cost ELSE 0 END) as storage_cost_usd
  FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
  WHERE 
    service.description = 'BigQuery'
    AND DATE(usage_start_time) >= CURRENT_DATE() - 30
  GROUP BY 1, 2, 3
)

SELECT 
  *,
  -- 전일 대비 증가율 계산
  LAG(total_cost_usd) OVER (
    PARTITION BY project_id 
    ORDER BY cost_date
  ) as previous_day_cost,
  
  SAFE_DIVIDE(
    total_cost_usd - LAG(total_cost_usd) OVER (PARTITION BY project_id ORDER BY cost_date),
    LAG(total_cost_usd) OVER (PARTITION BY project_id ORDER BY cost_date)
  ) * 100 as cost_change_pct

FROM daily_costs
${ when(incremental(), `
WHERE cost_date > (SELECT MAX(cost_date) FROM ${self()})
`) }

4.2 비용 알림 설정

-- definitions/assertions/cost_alert_thresholds.sqlx
config {
  type: "assertion",
  description: "일일 비용 임계값 초과 시 알림"
}

WITH cost_summary AS (
  SELECT 
    DATE(usage_start_time) as cost_date,
    SUM(cost) as daily_cost_usd
  FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
  WHERE 
    service.description = 'BigQuery'
    AND DATE(usage_start_time) = CURRENT_DATE()
  GROUP BY 1
)

SELECT 
  cost_date,
  daily_cost_usd,
  'DAILY_COST_THRESHOLD_EXCEEDED' as alert_type
FROM cost_summary
WHERE daily_cost_usd > 100 -- $100 임계값 설정

5. 비용 최적화 베스트 프랙티스

5.1 개발 환경 비용 최적화

// .df/profiles.json - 환경별 설정
{
  "dev": {
    "projectId": "my-project-dev",
    "location": "US",
    "vars": {
      "sample_percentage": 0.01,
      "enable_clustering": false,
      "max_rows_for_preview": 1000
    }
  },
  "prod": {
    "projectId": "my-project-prod", 
    "location": "US",
    "vars": {
      "sample_percentage": 1.0,
      "enable_clustering": true,
      "max_rows_for_preview": 10000
    }
  }
}
-- 환경별 데이터 샘플링
-- definitions/staging/stg_orders_optimized.sqlx
config {
  type: "table",
  disabled: dataform.projectConfig.vars.environment === 'dev' 
           && dataform.projectConfig.vars.sample_percentage < 1.0
}

SELECT *
FROM ${ref("raw_orders")}
${ when(dataform.projectConfig.vars.environment === 'dev', `
WHERE MOD(ABS(FARM_FINGERPRINT(CAST(order_id AS STRING))), 100) 
      < ${dataform.projectConfig.vars.sample_percentage * 100}
`) }

5.2 쿼리 성능 모니터링

-- definitions/monitoring/expensive_queries_analysis.sqlx
config {
  type: "table",
  description: "비용이 높은 쿼리 분석"
}

SELECT 
  job_id,
  user_email,
  creation_time,
  ROUND(total_bytes_processed / 1024 / 1024 / 1024, 2) as gb_processed,
  ROUND(total_slot_ms / 1000 / 60, 2) as slot_minutes,
  ROUND((total_bytes_processed / 1024 / 1024 / 1024 / 1024) * 6.0, 2) as estimated_cost_usd,
  -- 쿼리 패턴 분석
  CASE 
    WHEN CONTAINS_SUBSTR(query, 'SELECT *') THEN 'FULL_TABLE_SCAN'
    WHEN CONTAINS_SUBSTR(query, 'GROUP BY') AND NOT CONTAINS_SUBSTR(query, 'LIMIT') THEN 'LARGE_AGGREGATION'
    WHEN CONTAINS_SUBSTR(query, 'JOIN') AND NOT CONTAINS_SUBSTR(query, 'WHERE') THEN 'CARTESIAN_JOIN_RISK'
    ELSE 'OPTIMIZED'
  END as query_pattern,
  LEFT(query, 200) as query_preview
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE 
  DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
  AND job_type = 'QUERY'
  AND state = 'DONE'
  AND total_bytes_processed > 10 * 1024 * 1024 * 1024 -- 10GB 이상만
ORDER BY estimated_cost_usd DESC
LIMIT 50

6. 예상 월간 비용 계산기

-- definitions/monitoring/monthly_cost_forecast.sqlx
config { 
  type: "table",
  description: "월간 예상 비용 계산"
}

WITH usage_trends AS (
  SELECT 
    DATE_TRUNC(DATE(creation_time), MONTH) as month,
    COUNT(*) as total_queries,
    SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 as total_tb_processed,
    SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 * 6.0 as total_query_cost_usd
  FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE 
    DATE(creation_time) >= DATE_SUB(DATE_TRUNC(CURRENT_DATE(), MONTH), INTERVAL 3 MONTH)
    AND job_type = 'QUERY'
    AND state = 'DONE'
  GROUP BY 1
),

storage_costs AS (
  SELECT 
    DATE_TRUNC(CURRENT_DATE(), MONTH) as month,
    SUM(size_bytes) / 1024 / 1024 / 1024 as total_storage_gb,
    SUM(size_bytes) / 1024 / 1024 / 1024 * 0.020 as monthly_storage_cost_usd
  FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
)

SELECT 
  u.month,
  u.total_queries,
  ROUND(u.total_tb_processed, 3) as tb_processed,
  ROUND(u.total_query_cost_usd, 2) as query_cost_usd,
  ROUND(s.monthly_storage_cost_usd, 2) as storage_cost_usd,
  ROUND(u.total_query_cost_usd + COALESCE(s.monthly_storage_cost_usd, 0), 2) as total_monthly_cost_usd,
  
  -- 다음 달 예상 비용 (최근 3개월 평균 기준)
  ROUND(
    AVG(u.total_query_cost_usd) OVER (
      ORDER BY u.month 
      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) + COALESCE(s.monthly_storage_cost_usd, 0), 
    2
  ) as forecasted_next_month_usd

FROM usage_trends u
LEFT JOIN storage_costs s ON u.month = s.month
ORDER BY u.month DESC

7. 비용 절약 체크리스트

✅ 쿼리 최적화

  • 파티션 필터 사용 (WHERE DATE(column) >= '2024-01-01')
  • 필요한 컬럼만 선택 (SELECT * 지양)
  • 증분 처리 활용 (type: "incremental")
  • 클러스터링으로 쿼리 성능 향상
  • 적절한 데이터 타입 사용

✅ 테이블 최적화

  • 파티션 만료 정책 설정 (partitionExpirationDays)
  • 불필요한 테이블 정리
  • 중복 데이터 제거
  • 압축률 높은 데이터 타입 사용

✅ 개발 프로세스

  • 개발 환경에서 데이터 샘플링 사용
  • 테스트 쿼리 최소화
  • 비용 모니터링 대시보드 구축
  • 정기적인 비용 리뷰 실시

✅ 운영 최적화

  • 배치 작업은 off-peak 시간에 실행
  • 슬롯 예약 구매 검토 (대용량 워크로드 시)
  • 리전 간 데이터 이동 최소화
  • 자동화된 비용 알림 설정

트러블슈팅

1. 일반적인 오류 및 해결책

1.1 순환 의존성 오류

Error: Circular dependency detected: table_a -> table_b -> table_a

해결책:

// includes/temp_tables.js
function createTempTable(tableName, query) {
  return `CREATE OR REPLACE TABLE ${tableName}_temp AS (${query})`;
}

// 순환 의존성을 끊는 임시 테이블 사용
config {
  type: "table",
  preOps: [
    createTempTable(self(), `SELECT customer_id, COUNT(*) as order_count FROM raw_orders GROUP BY 1`)
  ]
}

1.2 메모리 부족 오류

Error: Resources exceeded during query execution

해결책:

-- 배치 처리로 대용량 데이터 처리
config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(order_date)",
    updatePartitionFilter: "DATE(order_date) = CURRENT_DATE()" // 하루씩 처리
  }
}

-- 또는 WITH문으로 중간 결과 구체화
WITH large_aggregation AS (
  SELECT 
    customer_id,
    SUM(amount) as total_amount
  FROM ${ref("fact_orders")}
  WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  GROUP BY customer_id
)
SELECT * FROM large_aggregation

1.3 파티션 필터 누락 오류

Error: Cannot query over table without partition filter

해결책:

-- 파티션 필터를 항상 포함
SELECT *
FROM ${ref("partitioned_table")}
WHERE DATE(created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) -- 필수 파티션 필터

-- 또는 파티션 요구사항 제거
config {
  bigquery: {
    partitionBy: "DATE(created_at)",
    requirePartitionFilter: false
  }
}

2. 성능 문제 해결

2.1 슬롯 부족 문제

-- 쿼리 우선순위 조정
config {
  type: "table",
  bigquery: {
    jobPriority: "INTERACTIVE", -- 또는 "BATCH"
    maximumBytesBilled: 10000000000 -- 10GB로 제한
  }
}

2.2 증분 처리 성능 최적화

-- 효율적인 증분 처리를 위한 워터마크 테이블
-- definitions/utils/processing_watermarks.sqlx
config { type: "table" }

SELECT 
  'orders' as table_name,
  MAX(updated_at) as last_processed_timestamp,
  CURRENT_TIMESTAMP() as watermark_updated_at
FROM ${ref("stg_orders")}
-- 워터마크 기반 증분 처리
${ when(incremental(), `
WHERE updated_at > (
  SELECT last_processed_timestamp 
  FROM ${ref("processing_watermarks")} 
  WHERE table_name = 'orders'
)
`) }

3. 데이터 일관성 문제

3.1 타임존 관련 문제

-- 일관된 타임존 처리
config {
  vars: {
    default_timezone: "America/New_York"
  }
}

SELECT 
  order_id,
  -- 모든 타임스탬프를 동일한 타임존으로 변환
  DATETIME(TIMESTAMP(order_timestamp), "${dataform.projectConfig.vars.default_timezone}") as order_datetime_local,
  DATE(TIMESTAMP(order_timestamp), "${dataform.projectConfig.vars.default_timezone}") as order_date_local
FROM ${ref("raw_orders")}

3.2 데이터 타입 불일치

-- 안전한 타입 변환 함수
-- includes/type_conversion.js
function safeCast(column, targetType, defaultValue = 'NULL') {
  return `
    CASE 
      WHEN ${column} IS NULL THEN ${defaultValue}
      WHEN SAFE_CAST(${column} AS ${targetType}) IS NULL THEN ${defaultValue}
      ELSE SAFE_CAST(${column} AS ${targetType})
    END
  `;
}

function safeNumeric(column, defaultValue = '0') {
  return safeCast(column, 'NUMERIC', defaultValue);
}

module.exports = { safeCast, safeNumeric };

4. 모니터링 및 알림

4.1 실패 감지 및 알림

-- definitions/monitoring/workflow_health_check.sqlx
config {
  type: "table",
  description: "워크플로우 상태 모니터링"
}

WITH recent_runs AS (
  SELECT 
    workflow_id,
    execution_time,
    status,
    error_message,
    ROW_NUMBER() OVER (PARTITION BY workflow_id ORDER BY execution_time DESC) as rn
  FROM `project.dataset.dataform_execution_logs`
  WHERE DATE(execution_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
)

SELECT 
  workflow_id,
  execution_time as last_run_time,
  status,
  error_message,
  CASE 
    WHEN status = 'FAILED' THEN 'CRITICAL'
    WHEN status = 'CANCELLED' THEN 'WARNING' 
    WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), execution_time, HOUR) > 24 THEN 'STALE'
    ELSE 'OK'
  END as health_status

FROM recent_runs
WHERE rn = 1

4.2 데이터 신선도 모니터링

-- definitions/monitoring/data_freshness.sqlx
config { type: "table" }

SELECT 
  'fact_orders' as table_name,
  MAX(order_date) as latest_data_date,
  CURRENT_DATE() as check_date,
  DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) as days_stale,
  CASE 
    WHEN DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) > 2 THEN 'STALE'
    WHEN DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) > 1 THEN 'WARNING'
    ELSE 'FRESH'
  END as freshness_status
FROM ${ref("fact_orders")}

UNION ALL

SELECT 
  'dim_customers',
  MAX(DATE(effective_date)),
  CURRENT_DATE(),
  DATE_DIFF(CURRENT_DATE(), MAX(DATE(effective_date)), DAY),
  CASE 
    WHEN DATE_DIFF(CURRENT_DATE(), MAX(DATE(effective_date)), DAY) > 1 THEN 'STALE'
    ELSE 'FRESH'
  END
FROM ${ref("dim_customers")}

5. 비용 최적화

5.1 쿼리 비용 모니터링

-- definitions/monitoring/query_cost_analysis.sqlx  
config { type: "table" }

SELECT 
  DATE(creation_time) as query_date,
  user_email,
  job_type,
  COUNT(*) as query_count,
  SUM(total_bytes_processed) / 1024 / 1024 / 1024 as total_gb_processed,
  AVG(total_bytes_processed) / 1024 / 1024 / 1024 as avg_gb_per_query,
  -- BigQuery 온디맨드 가격: $5 per TB
  SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 * 5 as estimated_cost_usd

FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE 
  DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
  AND job_type = 'QUERY'
  AND state = 'DONE'
  AND total_bytes_processed > 0
  
GROUP BY 1, 2, 3
ORDER BY estimated_cost_usd DESC

5.2 테이블 크기 최적화

-- definitions/monitoring/table_storage_analysis.sqlx
config { type: "table" }

SELECT 
  table_schema,
  table_name,
  ROUND(size_bytes / 1024 / 1024 / 1024, 2) as size_gb,
  row_count,
  partitioning_type,
  clustering_fields,
  ROUND(size_bytes / row_count / 1024, 2) as avg_row_size_kb,
  
  -- 파티션 정리 추천
  CASE 
    WHEN partitioning_type LIKE '%DAY%' AND size_gb > 100 THEN 'CONSIDER_PARTITION_EXPIRATION'
    WHEN clustering_fields IS NULL AND size_gb > 50 THEN 'CONSIDER_CLUSTERING'
    WHEN row_count < 1000000 AND size_gb < 1 THEN 'TOO_SMALL_FOR_PARTITIONING'
    ELSE 'OPTIMIZED'
  END as optimization_recommendation

FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
WHERE table_schema NOT IN ('INFORMATION_SCHEMA', 'sys')
ORDER BY size_gb DESC

추가 리소스

공식 문서

커뮤니티 리소스

모범 사례 가이드


이 문서는 BigQuery Dataform의 종합적인 활용 가이드입니다. 프로젝트 요구사항에 맞게 예제들을 참조하여 실제 데이터 파이프라인을 구축해보세요.