빅쿼리 데이터폼
Dataform 데이터 파이프라인
목차
- Dataform 개요
- 주요 개념
- 설치 및 설정
- 프로젝트 구조
- 테이블 정의
- 변환 작업
- 의존성 관리
- 테스트
- 버전 관리 및 배포
- 모니터링 및 디버깅
- 실제 예제
- 베스트 프랙티스
- 사용 요금 및 비용 최적화
- 트러블슈팅
Dataform 개요
Dataform이란?
- Google Cloud에서 제공하는 데이터 변환 워크플로우 관리 도구
- SQL 기반으로 복잡한 데이터 파이프라인을 코드로 관리
- BigQuery와 완전 통합되어 스케일러블한 데이터 변환 지원
- 버전 관리, 의존성 관리, 테스팅 등 소프트웨어 개발 모범 사례 적용
주요 특징
- 선언적 SQL 기반: 복잡한 데이터 변환을 간단한 SQL로 정의
- 의존성 자동 관리: 테이블 간 의존성을 자동으로 파악하여 올바른 순서로 실행
- 증분 처리: 새로운 데이터만 처리하여 비용 및 시간 최적화
- 테스트 기능: 데이터 품질 검증을 위한 단위 테스트 지원
- Git 통합: 소스 코드 버전 관리 및 협업 지원
사용 사례
- ETL/ELT 파이프라인: 원시 데이터를 분석 가능한 형태로 변환
- 데이터 웨어하우스 구축: 차원 모델링 및 데이터 마트 생성
- 데이터 품질 관리: 일관성 있는 데이터 검증 및 정제
- 리포팅 데이터 준비: 비즈니스 인텔리전스 도구용 데이터 가공
주요 개념
1. Repository (저장소)
# dataform.json
{
"defaultSchema": "dataform_staging",
"assertionSchema": "dataform_assertions",
"defaultDatabase": "my-project",
"defaultLocation": "US"
}
2. Workflow (워크플로우)
- 데이터 변환 작업의 실행 단위
- 의존성에 따른 자동 실행 순서 결정
- 병렬 실행으로 성능 최적화
3. Actions (액션)
Table
-- tables/dim_customers.sqlx
config {
type: "table",
schema: "analytics",
description: "고객 차원 테이블"
}
SELECT
customer_id,
customer_name,
email,
registration_date,
last_order_date
FROM ${ref("raw_customers")}
WHERE customer_id IS NOT NULL
View
-- definitions/vw_sales_summary.sqlx
config {
type: "view",
description: "매출 요약 뷰"
}
SELECT
DATE(order_date) as order_date,
SUM(total_amount) as daily_sales,
COUNT(DISTINCT order_id) as order_count
FROM ${ref("fact_orders")}
GROUP BY DATE(order_date)
Incremental Table
-- tables/fact_orders_incremental.sqlx
config {
type: "incremental",
uniqueKey: ["order_id"],
updatePartitionFilter: "order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
}
SELECT
order_id,
customer_id,
order_date,
total_amount,
CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_orders")}
${ when(incremental(), `WHERE order_date >= (SELECT MAX(order_date) FROM ${self()})`) }
Assertion (검증)
-- definitions/assertions/assert_positive_amounts.sqlx
config {
type: "assertion",
description: "주문 금액이 음수가 아닌지 확인"
}
SELECT *
FROM ${ref("fact_orders")}
WHERE total_amount < 0
4. 의존성 참조
-- ref() 함수: 동일 프로젝트 내 테이블 참조
SELECT * FROM ${ref("source_table")}
-- resolve() 함수: 외부 테이블 참조
SELECT * FROM ${resolve("external_dataset.external_table")}
-- self() 함수: 증분 테이블에서 자기 자신 참조
WHERE date > (SELECT MAX(date) FROM ${self()})
설치 및 설정
1. Google Cloud Console에서 Dataform 활성화
API 활성화
gcloud services enable dataform.googleapis.com
IAM 권한 설정
# 서비스 계정 생성
gcloud iam service-accounts create dataform-sa \
--description="Dataform service account" \
--display-name="Dataform SA"
# BigQuery 권한 부여
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataform-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataform-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/bigquery.jobUser"
2. 로컬 개발 환경 설정
Dataform CLI 설치
npm install -g @dataform/cli
프로젝트 초기화
# 새 프로젝트 생성
dataform init my-dataform-project
cd my-dataform-project
# Google Cloud 인증 설정
gcloud auth application-default login
개발 환경 구성
// .df/profiles.json
{
"dev": {
"projectId": "my-project-dev",
"location": "US"
},
"prod": {
"projectId": "my-project-prod",
"location": "US"
}
}
3. Google Cloud Console에서 저장소 생성
저장소 연결 설정
- Dataform 콘솔 접속
- “Create repository” 클릭
- Git 저장소와 연결 또는 새 저장소 생성
- 기본 브랜치 및 설정 구성
프로젝트 구조
표준 디렉토리 구조
dataform-project/
├── .df/
│ └── profiles.json # 환경별 프로필 설정
├── definitions/ # 모든 정의 파일들
│ ├── staging/ # 스테이징 영역
│ │ ├── stg_orders.sqlx
│ │ └── stg_customers.sqlx
│ ├── marts/ # 데이터 마트
│ │ ├── dim_customers.sqlx
│ │ └── fact_orders.sqlx
│ ├── assertions/ # 데이터 검증
│ │ ├── assert_unique_customers.sqlx
│ │ └── assert_positive_amounts.sqlx
│ └── sources/ # 소스 데이터 정의
├── includes/ # JavaScript 함수들
│ ├── constants.js
│ └── macros.js
├── dataform.json # 프로젝트 설정
└── package.json # Node.js 의존성
파일 명명 규칙
-- 스테이징: stg_[source]_[entity].sqlx
-- definitions/staging/stg_ecommerce_orders.sqlx
-- 차원 테이블: dim_[entity].sqlx
-- definitions/marts/dim_customers.sqlx
-- 팩트 테이블: fact_[process].sqlx
-- definitions/marts/fact_sales.sqlx
-- 어서션: assert_[condition].sqlx
-- definitions/assertions/assert_unique_order_ids.sqlx
설정 파일 관리
// dataform.json
{
"defaultSchema": "analytics",
"assertionSchema": "data_quality",
"defaultDatabase": "my-bigquery-project",
"defaultLocation": "US",
"vars": {
"start_date": "2023-01-01",
"source_dataset": "raw_data"
}
}
테이블 정의
1. 기본 테이블 생성
단순 테이블
-- definitions/marts/dim_products.sqlx
config {
type: "table",
schema: "analytics",
description: "제품 차원 테이블 - 모든 제품 정보를 포함",
columns: {
product_id: "제품 고유 식별자",
product_name: "제품명",
category: "제품 카테고리",
price: "제품 가격",
created_at: "레코드 생성 시간"
},
bigquery: {
partitionBy: "DATE(created_at)",
clusterBy: ["category", "product_id"]
}
}
SELECT
p.product_id,
p.product_name,
c.category_name as category,
p.price,
p.created_at,
CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_products")} p
JOIN ${ref("raw_categories")} c
ON p.category_id = c.category_id
WHERE p.is_active = true
파티셔닝 및 클러스터링
-- definitions/marts/fact_sales_daily.sqlx
config {
type: "table",
bigquery: {
partitionBy: "DATE(order_date)",
partitionExpirationDays: 365,
clusterBy: ["customer_id", "product_id"],
requirePartitionFilter: true
}
}
SELECT
DATE(order_date) as order_date,
customer_id,
product_id,
SUM(quantity) as total_quantity,
SUM(total_amount) as total_sales,
COUNT(DISTINCT order_id) as order_count
FROM ${ref("stg_orders")}
WHERE DATE(order_date) >= '2023-01-01'
GROUP BY 1, 2, 3
2. 증분 테이블 (Incremental)
기본 증분 처리
-- definitions/staging/stg_events_incremental.sqlx
config {
type: "incremental",
uniqueKey: ["event_id"],
bigquery: {
partitionBy: "DATE(event_timestamp)",
updatePartitionFilter: "DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)"
}
}
SELECT
event_id,
user_id,
event_type,
event_timestamp,
event_properties,
CURRENT_TIMESTAMP() as processed_at
FROM ${ref("raw_events")}
${ when(incremental(), `
WHERE event_timestamp > (
SELECT COALESCE(MAX(event_timestamp), TIMESTAMP('1900-01-01'))
FROM ${self()}
)
`) }
삭제/업데이트 처리 (Delete+Insert)
-- definitions/marts/dim_customers_scd2.sqlx
config {
type: "incremental",
uniqueKey: ["customer_id", "effective_date"],
bigquery: {
updatePartitionFilter: "effective_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
}
}
WITH source_data AS (
SELECT
customer_id,
customer_name,
email,
address,
phone,
updated_at as effective_date,
LEAD(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as end_date,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) as rn
FROM ${ref("stg_customers")}
${ when(incremental(), `
WHERE updated_at > (
SELECT COALESCE(MAX(effective_date), DATE('1900-01-01'))
FROM ${self()}
)
`) }
)
SELECT
customer_id,
customer_name,
email,
address,
phone,
effective_date,
COALESCE(end_date, DATE('9999-12-31')) as end_date,
CASE WHEN end_date IS NULL THEN true ELSE false END as is_current
FROM source_data
3. 뷰 정의
단순 뷰
-- definitions/marts/vw_customer_metrics.sqlx
config {
type: "view",
description: "고객별 주요 지표 뷰"
}
SELECT
c.customer_id,
c.customer_name,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(o.total_amount) as total_spent,
AVG(o.total_amount) as avg_order_value,
MIN(o.order_date) as first_order_date,
MAX(o.order_date) as last_order_date,
DATE_DIFF(CURRENT_DATE(), MAX(o.order_date), DAY) as days_since_last_order
FROM ${ref("dim_customers")} c
LEFT JOIN ${ref("fact_orders")} o
ON c.customer_id = o.customer_id
GROUP BY 1, 2
물리화된 뷰 (Materialized View)
-- definitions/marts/mv_daily_sales.sqlx
config {
type: "view",
materialized: true,
bigquery: {
partitionBy: "DATE(order_date)",
clusterBy: ["region", "category"]
}
}
SELECT
DATE(order_date) as order_date,
p.category,
c.region,
COUNT(DISTINCT o.order_id) as order_count,
COUNT(DISTINCT o.customer_id) as customer_count,
SUM(o.total_amount) as total_sales,
AVG(o.total_amount) as avg_order_value
FROM ${ref("fact_orders")} o
JOIN ${ref("dim_products")} p ON o.product_id = p.product_id
JOIN ${ref("dim_customers")} c ON o.customer_id = c.customer_id
WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY 1, 2, 3
변환 작업
1. JavaScript 매크로 활용
공통 매크로 정의
// includes/macros.js
// 날짜 범위 필터 매크로
function dateFilter(column, days_back = 30) {
return `${column} >= DATE_SUB(CURRENT_DATE(), INTERVAL ${days_back} DAY)`;
}
// Pivot 매크로
function pivot(column, values, agg_func = 'SUM', agg_column = '*') {
const pivotColumns = values.map(value =>
`${agg_func}(CASE WHEN ${column} = '${value}' THEN ${agg_column} END) as ${value}`
).join(',\n ');
return pivotColumns;
}
// 안전한 나눗셈 매크로
function safeDivide(numerator, denominator) {
return `SAFE_DIVIDE(${numerator}, ${denominator})`;
}
// 이메일 마스킹 매크로
function maskEmail(email_column) {
return `
CONCAT(
LEFT(${email_column}, 3),
'***',
SUBSTR(${email_column}, STRPOS(${email_column}, '@'))
)
`;
}
module.exports = {
dateFilter,
pivot,
safeDivide,
maskEmail
};
매크로 사용 예제
-- definitions/marts/fact_sales_pivot.sqlx
config { type: "table" }
SELECT
order_date,
customer_id,
${pivot('payment_method', ['credit_card', 'debit_card', 'cash'], 'SUM', 'total_amount')}
FROM ${ref("fact_orders")}
WHERE ${dateFilter('order_date', 90)}
GROUP BY order_date, customer_id
2. 조건부 로직
환경별 분기 처리
-- definitions/staging/stg_orders.sqlx
config {
type: "table",
schema: dataform.projectConfig.vars.target_schema
}
SELECT
order_id,
customer_id,
order_date,
total_amount,
-- 개발환경에서는 데이터 샘플링
${ when(dataform.projectConfig.vars.environment === 'dev', `
FROM ${ref("raw_orders")}
WHERE MOD(ABS(FARM_FINGERPRINT(CAST(order_id AS STRING))), 100) < 10
`) }
${ when(dataform.projectConfig.vars.environment !== 'dev', `
FROM ${ref("raw_orders")}
`) }
점진적 배포
-- definitions/marts/fact_orders_v2.sqlx
config {
type: "table",
disabled: dataform.projectConfig.vars.enable_v2 !== true
}
-- 새로운 버전의 테이블 정의
SELECT
order_id,
customer_id,
order_date,
total_amount,
-- 새 컬럼 추가
discount_amount,
tax_amount,
net_amount
FROM ${ref("stg_orders")}
3. 복잡한 변환 로직
윈도우 함수 활용
-- definitions/marts/customer_cohort_analysis.sqlx
config {
type: "table",
description: "고객 코호트 분석 테이블"
}
WITH customer_orders AS (
SELECT
customer_id,
order_date,
total_amount,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_number,
MIN(order_date) OVER (PARTITION BY customer_id) as first_order_date
FROM ${ref("fact_orders")}
),
cohort_data AS (
SELECT
customer_id,
DATE_TRUNC(first_order_date, MONTH) as cohort_month,
DATE_DIFF(DATE_TRUNC(order_date, MONTH), DATE_TRUNC(first_order_date, MONTH), MONTH) as period_number,
total_amount
FROM customer_orders
)
SELECT
cohort_month,
period_number,
COUNT(DISTINCT customer_id) as customers,
SUM(total_amount) as revenue,
AVG(total_amount) as avg_revenue_per_customer
FROM cohort_data
GROUP BY cohort_month, period_number
ORDER BY cohort_month, period_number
JSON 데이터 처리
-- definitions/staging/stg_events_parsed.sqlx
config { type: "table" }
SELECT
event_id,
user_id,
event_timestamp,
event_type,
-- JSON 속성 파싱
JSON_EXTRACT_SCALAR(event_properties, '$.page_url') as page_url,
JSON_EXTRACT_SCALAR(event_properties, '$.referrer') as referrer,
CAST(JSON_EXTRACT_SCALAR(event_properties, '$.session_duration') AS INT64) as session_duration,
-- 중첩된 JSON 배열 처리
ARRAY(
SELECT JSON_EXTRACT_SCALAR(item, '$.product_id')
FROM UNNEST(JSON_EXTRACT_ARRAY(event_properties, '$.products')) as item
) as product_ids,
-- 조건부 JSON 파싱
CASE event_type
WHEN 'purchase' THEN CAST(JSON_EXTRACT_SCALAR(event_properties, '$.amount') AS FLOAT64)
ELSE NULL
END as purchase_amount
FROM ${ref("raw_events")}
WHERE JSON_EXTRACT_SCALAR(event_properties, '$.valid') = 'true'
의존성 관리
1. 의존성 그래프 이해
ref() 함수를 통한 의존성 정의
-- definitions/marts/customer_lifetime_value.sqlx
config {
type: "table",
dependencies: ["dim_customers", "fact_orders"] -- 명시적 의존성 선언
}
SELECT
c.customer_id,
c.customer_name,
c.acquisition_date,
-- 의존하는 테이블들 참조
o.total_orders,
o.total_spent,
o.avg_order_value,
-- CLV 계산
o.total_spent * 1.2 as predicted_ltv
FROM ${ref("dim_customers")} c
JOIN (
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(total_amount) as total_spent,
AVG(total_amount) as avg_order_value
FROM ${ref("fact_orders")}
GROUP BY customer_id
) o ON c.customer_id = o.customer_id
2. 순환 의존성 해결
Pre-hook를 사용한 임시 테이블
-- definitions/staging/stg_customer_metrics.sqlx
config {
type: "table",
preOps: [`
CREATE OR REPLACE TABLE ${self()}_temp AS
SELECT customer_id, COUNT(*) as order_count
FROM ${ref("raw_orders")}
GROUP BY customer_id
`]
}
SELECT
c.*,
COALESCE(t.order_count, 0) as total_orders
FROM ${ref("raw_customers")} c
LEFT JOIN ${self()}_temp t
ON c.customer_id = t.customer_id
3. 조건부 실행
특정 조건에서만 실행
-- definitions/marts/weekend_sales_report.sqlx
config {
type: "table",
disabled: "${new Date().getDay() !== 0 && new Date().getDay() !== 6}" -- 주말에만 실행
}
SELECT
DATE(order_date) as weekend_date,
SUM(total_amount) as weekend_sales
FROM ${ref("fact_orders")}
WHERE EXTRACT(DAYOFWEEK FROM order_date) IN (1, 7) -- 일요일, 토요일
AND DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY DATE(order_date)
데이터 존재 여부 확인
// includes/utils.js
function tableHasData(tableName) {
return `(SELECT COUNT(*) FROM ${tableName}) > 0`;
}
module.exports = { tableHasData };
-- definitions/marts/incremental_report.sqlx
config {
type: "table",
disabled: !${tableHasData(ref("stg_daily_updates"))}
}
SELECT * FROM ${ref("stg_daily_updates")}
테스트
1. 데이터 검증 (Assertions)
기본 어서션
-- definitions/assertions/assert_unique_customer_ids.sqlx
config {
type: "assertion",
description: "고객 ID가 중복되지 않는지 확인"
}
SELECT customer_id
FROM ${ref("dim_customers")}
GROUP BY customer_id
HAVING COUNT(*) > 1
복잡한 비즈니스 룰 검증
-- definitions/assertions/assert_order_business_rules.sqlx
config {
type: "assertion",
description: "주문 데이터 비즈니스 룰 검증"
}
SELECT
order_id,
'negative_amount' as violation_type,
total_amount
FROM ${ref("fact_orders")}
WHERE total_amount < 0
UNION ALL
SELECT
order_id,
'future_date' as violation_type,
order_date
FROM ${ref("fact_orders")}
WHERE DATE(order_date) > CURRENT_DATE()
UNION ALL
SELECT
order_id,
'missing_customer' as violation_type,
customer_id
FROM ${ref("fact_orders")} o
LEFT JOIN ${ref("dim_customers")} c
ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
데이터 품질 메트릭
-- definitions/assertions/data_quality_metrics.sqlx
config {
type: "assertion",
description: "데이터 품질 임계값 검증"
}
WITH quality_metrics AS (
SELECT
'customers' as table_name,
COUNT(*) as total_rows,
COUNT(DISTINCT customer_id) as unique_customers,
COUNTIF(email IS NULL OR email = '') as missing_emails,
COUNTIF(NOT REGEXP_CONTAINS(email, r'^[^@]+@[^@]+\.[^@]+$')) as invalid_emails
FROM ${ref("dim_customers")}
)
SELECT *
FROM quality_metrics
WHERE
-- 중복률이 5% 초과
(total_rows - unique_customers) / total_rows > 0.05
-- 이메일 누락률이 10% 초과
OR missing_emails / total_rows > 0.10
-- 유효하지 않은 이메일이 2% 초과
OR invalid_emails / total_rows > 0.02
2. 단위 테스트
JavaScript를 사용한 테스트 유틸리티
// includes/test_utils.js
function createTestData(tableName, data) {
const columns = Object.keys(data[0]).join(', ');
const values = data.map(row =>
`(${Object.values(row).map(v => typeof v === 'string' ? `'${v}'` : v).join(', ')})`
).join(', ');
return `
CREATE OR REPLACE TABLE ${tableName} (${columns}) AS
SELECT * FROM UNNEST([
${values}
])
`;
}
function assertEqual(actual, expected, message) {
return `
SELECT
'${message}' as test_name,
${actual} as actual_value,
${expected} as expected_value,
CASE WHEN ${actual} = ${expected} THEN 'PASS' ELSE 'FAIL' END as result
`;
}
module.exports = { createTestData, assertEqual };
3. 회귀 테스트
데이터 변화 감지
-- definitions/assertions/detect_data_regression.sqlx
config {
type: "assertion",
description: "전일 대비 주요 메트릭 변화 감지"
}
WITH current_metrics AS (
SELECT
DATE(CURRENT_DATE()) as metric_date,
COUNT(DISTINCT customer_id) as active_customers,
COUNT(DISTINCT order_id) as total_orders,
SUM(total_amount) as total_revenue
FROM ${ref("fact_orders")}
WHERE DATE(order_date) = CURRENT_DATE()
),
previous_metrics AS (
SELECT
DATE(CURRENT_DATE() - 1) as metric_date,
COUNT(DISTINCT customer_id) as active_customers,
COUNT(DISTINCT order_id) as total_orders,
SUM(total_amount) as total_revenue
FROM ${ref("fact_orders")}
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
)
SELECT
'regression_detected' as alert_type,
c.metric_date,
c.active_customers as current_customers,
p.active_customers as previous_customers,
ABS(c.active_customers - p.active_customers) / p.active_customers as customer_change_pct
FROM current_metrics c
CROSS JOIN previous_metrics p
WHERE
-- 고객 수가 50% 이상 변화
ABS(c.active_customers - p.active_customers) / p.active_customers > 0.5
-- 또는 매출이 70% 이상 변화
OR ABS(c.total_revenue - p.total_revenue) / p.total_revenue > 0.7
버전 관리 및 배포
1. Git 기반 워크플로우
브랜치 전략
# Feature 브랜치 생성
git checkout -b feature/customer-segmentation
# 개발 및 테스트
dataform compile
dataform test
# 변경사항 커밋
git add .
git commit -m "Add customer segmentation tables"
# Pull Request 생성 후 병합
git checkout main
git pull origin main
환경별 배포
# 개발 환경 배포
dataform run --profile=dev --vars='{environment: "dev"}'
# 스테이징 환경 배포
dataform run --profile=staging --vars='{environment: "staging"}'
# 프로덕션 배포 (특정 태그만)
dataform run --profile=prod --vars='{environment: "prod"}' --tags=production
2. CI/CD 파이프라인
GitHub Actions 워크플로우
# .github/workflows/dataform.yml
name: Dataform CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: '16'
- name: Install dependencies
run: npm install
- name: Compile Dataform
run: npx dataform compile
- name: Run tests
run: npx dataform test --profile=ci
deploy-staging:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Deploy to staging
run: |
npx dataform run --profile=staging
deploy-production:
needs: [test, deploy-staging]
runs-on: ubuntu-latest
if: github.event_name == 'release'
steps:
- uses: actions/checkout@v2
- name: Deploy to production
run: |
npx dataform run --profile=prod --tags=production
3. 점진적 배포 전략
Blue-Green 배포
-- definitions/staging/stg_orders_v2.sqlx
config {
type: "table",
schema: "staging_v2", -- 새 버전은 별도 스키마에 배포
disabled: dataform.projectConfig.vars.enable_v2 !== true
}
-- 개선된 스테이징 로직
SELECT
order_id,
customer_id,
order_date,
total_amount,
-- 새로운 데이터 품질 검증
CASE
WHEN total_amount < 0 THEN 0
ELSE total_amount
END as cleaned_total_amount
FROM ${ref("raw_orders")}
WHERE order_date >= '2023-01-01'
카나리 배포
// includes/deployment.js
const CANARY_PERCENTAGE = dataform.projectConfig.vars.canary_percentage || 5;
function isCanaryCustomer(customerIdColumn) {
return `MOD(ABS(FARM_FINGERPRINT(CAST(${customerIdColumn} AS STRING))), 100) < ${CANARY_PERCENTAGE}`;
}
module.exports = { isCanaryCustomer };
-- definitions/marts/fact_orders_canary.sqlx
config {
type: "table",
disabled: dataform.projectConfig.vars.enable_canary !== true
}
SELECT *
FROM ${ref("stg_orders_v2")}
WHERE ${isCanaryCustomer("customer_id")}
모니터링 및 디버깅
1. 실행 모니터링
Cloud Logging을 통한 로그 분석
-- 실행 로그 쿼리 예제
SELECT
timestamp,
severity,
jsonPayload.workflow_id,
jsonPayload.action_name,
jsonPayload.status,
jsonPayload.execution_time_ms
FROM `project.dataset.dataform_logs`
WHERE DATE(timestamp) = CURRENT_DATE()
AND jsonPayload.status = 'FAILED'
ORDER BY timestamp DESC
알림 설정
# monitoring/alerting-policy.yaml
displayName: "Dataform Workflow Failures"
conditions:
- displayName: "Workflow failure rate"
conditionThreshold:
filter: 'resource.type="dataform_workflow"'
comparison: COMPARISON_GREATER_THAN
thresholdValue: 0.1
duration: 300s
notificationChannels:
- "projects/PROJECT_ID/notificationChannels/CHANNEL_ID"
2. 성능 최적화
쿼리 성능 분석
-- definitions/monitoring/query_performance_analysis.sqlx
config {
type: "table",
description: "Dataform 쿼리 성능 분석"
}
SELECT
job_id,
user_email,
project_id,
creation_time,
start_time,
end_time,
TIMESTAMP_DIFF(end_time, start_time, SECOND) as execution_duration_sec,
total_bytes_processed / 1024 / 1024 / 1024 as gb_processed,
total_slot_ms / 1000 / 60 as slot_minutes,
query
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
AND CONTAINS_SUBSTR(query, 'dataform')
ORDER BY execution_duration_sec DESC
LIMIT 100
리소스 사용량 모니터링
-- definitions/monitoring/resource_usage_daily.sqlx
config {
type: "incremental",
uniqueKey: ["usage_date", "project_id"]
}
SELECT
DATE(usage_start_time) as usage_date,
project.id as project_id,
service.description as service_name,
sku.description as sku_description,
SUM(usage.amount) as total_usage,
SUM(cost) as total_cost_usd,
currency
FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
WHERE DATE(usage_start_time) >= CURRENT_DATE() - 7
AND service.description LIKE '%BigQuery%'
${ when(incremental(), `
AND DATE(usage_start_time) > (SELECT MAX(usage_date) FROM ${self()})
`) }
GROUP BY 1, 2, 3, 4, 7
3. 데이터 린리지 추적
메타데이터 수집
-- definitions/monitoring/data_lineage.sqlx
config {
type: "table",
description: "데이터 린리지 메타데이터"
}
WITH table_dependencies AS (
SELECT
'fact_orders' as target_table,
['stg_orders', 'dim_customers', 'dim_products'] as source_tables
UNION ALL
SELECT
'dim_customers',
['raw_customers', 'raw_customer_addresses']
-- ... 다른 테이블 의존성들
)
SELECT
target_table,
source_table,
'dataform' as transformation_tool,
CURRENT_TIMESTAMP() as last_updated
FROM table_dependencies
CROSS JOIN UNNEST(source_tables) as source_table
컬럼 레벨 린리지
-- definitions/monitoring/column_lineage.sqlx
config { type: "table" }
SELECT
'fact_orders' as target_table,
'customer_id' as target_column,
'stg_orders' as source_table,
'customer_id' as source_column,
'direct' as transformation_type
UNION ALL
SELECT
'fact_orders',
'total_amount_with_tax',
'stg_orders',
'total_amount',
'calculated' -- total_amount * (1 + tax_rate)
-- ... 추가 컬럼 린리지 매핑
실제 예제
1. E-commerce 데이터 파이프라인
원시 데이터 스테이징
-- definitions/staging/stg_ecommerce_orders.sqlx
config {
type: "table",
description: "E-commerce 주문 데이터 스테이징",
bigquery: {
partitionBy: "DATE(order_timestamp)",
clusterBy: ["customer_id", "status"]
}
}
SELECT
-- 기본 주문 정보
CAST(order_id AS STRING) as order_id,
CAST(customer_id AS STRING) as customer_id,
order_timestamp,
DATE(order_timestamp) as order_date,
-- 주문 상태 정규화
CASE status
WHEN 'completed' THEN 'COMPLETED'
WHEN 'cancelled' THEN 'CANCELLED'
WHEN 'pending' THEN 'PENDING'
WHEN 'shipped' THEN 'SHIPPED'
ELSE 'UNKNOWN'
END as order_status,
-- 금액 정보 정제
CAST(subtotal AS FLOAT64) as subtotal,
CAST(tax_amount AS FLOAT64) as tax_amount,
CAST(shipping_cost AS FLOAT64) as shipping_cost,
CAST(subtotal AS FLOAT64) + CAST(tax_amount AS FLOAT64) + CAST(shipping_cost AS FLOAT64) as total_amount,
-- 할인 정보
COALESCE(CAST(discount_amount AS FLOAT64), 0) as discount_amount,
discount_code,
-- 배송 정보
shipping_address,
REGEXP_EXTRACT(shipping_address, r', ([A-Z]{2}) \d{5}') as shipping_state,
-- 메타데이터
created_at,
updated_at,
CURRENT_TIMESTAMP() as processed_at
FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_orders')}
WHERE
order_timestamp IS NOT NULL
AND customer_id IS NOT NULL
AND CAST(subtotal AS FLOAT64) >= 0
주문 상품 스테이징
-- definitions/staging/stg_order_items.sqlx
config {
type: "incremental",
uniqueKey: ["order_id", "product_id"],
bigquery: {
partitionBy: "DATE(order_timestamp)"
}
}
SELECT
CAST(order_id AS STRING) as order_id,
CAST(product_id AS STRING) as product_id,
order_timestamp,
-- 수량 및 가격 정보
CAST(quantity AS INT64) as quantity,
CAST(unit_price AS FLOAT64) as unit_price,
CAST(quantity AS INT64) * CAST(unit_price AS FLOAT64) as line_total,
-- 할인 적용
COALESCE(CAST(item_discount AS FLOAT64), 0) as item_discount,
(CAST(quantity AS INT64) * CAST(unit_price AS FLOAT64)) - COALESCE(CAST(item_discount AS FLOAT64), 0) as line_total_after_discount,
CURRENT_TIMESTAMP() as processed_at
FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_order_items')}
WHERE
order_id IS NOT NULL
AND product_id IS NOT NULL
AND CAST(quantity AS INT64) > 0
AND CAST(unit_price AS FLOAT64) >= 0
${ when(incremental(), `
AND order_timestamp > (
SELECT COALESCE(MAX(order_timestamp), TIMESTAMP('1900-01-01'))
FROM ${self()}
)
`) }
고객 차원 테이블
-- definitions/marts/dim_customers.sqlx
config {
type: "table",
description: "고객 차원 테이블 - SCD Type 2",
bigquery: {
clusterBy: ["customer_id", "is_active"]
}
}
WITH customer_history AS (
SELECT
CAST(customer_id AS STRING) as customer_id,
email,
first_name,
last_name,
phone,
date_of_birth,
gender,
-- 주소 정보
address_line1,
address_line2,
city,
state,
zip_code,
country,
-- 계정 상태
account_status,
registration_date,
last_login_date,
-- 변경 이력 관리
updated_at,
LAG(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as previous_update,
LEAD(updated_at) OVER (PARTITION BY customer_id ORDER BY updated_at) as next_update,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) as latest_record
FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_customers')}
WHERE customer_id IS NOT NULL
)
SELECT
-- 고객 식별자
customer_id,
-- 개인 정보
email,
CONCAT(first_name, ' ', last_name) as full_name,
first_name,
last_name,
phone,
date_of_birth,
gender,
-- 연령 계산
DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) as age,
CASE
WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 25 THEN '18-24'
WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 35 THEN '25-34'
WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 45 THEN '35-44'
WHEN DATE_DIFF(CURRENT_DATE(), date_of_birth, YEAR) < 55 THEN '45-54'
ELSE '55+'
END as age_group,
-- 주소 정보
address_line1,
address_line2,
city,
state,
zip_code,
country,
-- 계정 정보
account_status,
registration_date,
last_login_date,
-- SCD Type 2 컬럼들
updated_at as effective_date,
COALESCE(next_update, TIMESTAMP('2999-12-31')) as end_date,
CASE WHEN latest_record = 1 THEN true ELSE false END as is_active,
-- 메타데이터
CURRENT_TIMESTAMP() as processed_at
FROM customer_history
제품 차원 테이블
-- definitions/marts/dim_products.sqlx
config {
type: "table",
description: "제품 차원 테이블",
bigquery: {
clusterBy: ["category_id", "is_active"]
}
}
SELECT
-- 제품 식별자
CAST(p.product_id AS STRING) as product_id,
p.product_name,
p.product_description,
p.sku,
-- 카테고리 정보
CAST(p.category_id AS STRING) as category_id,
c.category_name,
c.parent_category_id,
c.category_hierarchy,
-- 브랜드 정보
CAST(p.brand_id AS STRING) as brand_id,
b.brand_name,
-- 가격 정보
CAST(p.unit_price AS FLOAT64) as unit_price,
CAST(p.cost AS FLOAT64) as unit_cost,
CAST(p.unit_price AS FLOAT64) - CAST(p.cost AS FLOAT64) as unit_margin,
SAFE_DIVIDE(CAST(p.unit_price AS FLOAT64) - CAST(p.cost AS FLOAT64), CAST(p.unit_price AS FLOAT64)) as margin_percentage,
-- 제품 속성
p.color,
p.size,
p.weight,
p.dimensions,
-- 재고 정보
CAST(p.stock_quantity AS INT64) as current_stock,
CAST(p.reorder_level AS INT64) as reorder_level,
CASE
WHEN CAST(p.stock_quantity AS INT64) <= CAST(p.reorder_level AS INT64) THEN 'LOW_STOCK'
WHEN CAST(p.stock_quantity AS INT64) = 0 THEN 'OUT_OF_STOCK'
ELSE 'IN_STOCK'
END as stock_status,
-- 상태 정보
p.is_active,
p.launch_date,
p.discontinued_date,
-- 메타데이터
p.created_at,
p.updated_at,
CURRENT_TIMESTAMP() as processed_at
FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_products')} p
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_categories')} c
ON p.category_id = c.category_id
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.raw_brands')} b
ON p.brand_id = b.brand_id
WHERE p.product_id IS NOT NULL
주문 팩트 테이블
-- definitions/marts/fact_orders.sqlx
config {
type: "incremental",
uniqueKey: ["order_id"],
bigquery: {
partitionBy: "order_date",
clusterBy: ["customer_id", "order_status"]
}
}
SELECT
-- 주문 식별자
o.order_id,
-- 고객 외래키
o.customer_id,
-- 날짜 정보
o.order_timestamp,
o.order_date,
EXTRACT(YEAR FROM o.order_date) as order_year,
EXTRACT(MONTH FROM o.order_date) as order_month,
EXTRACT(DAYOFWEEK FROM o.order_date) as day_of_week,
CASE EXTRACT(DAYOFWEEK FROM o.order_date)
WHEN 1 THEN 'Sunday'
WHEN 2 THEN 'Monday'
WHEN 3 THEN 'Tuesday'
WHEN 4 THEN 'Wednesday'
WHEN 5 THEN 'Thursday'
WHEN 6 THEN 'Friday'
WHEN 7 THEN 'Saturday'
END as day_name,
-- 주문 상태
o.order_status,
-- 금액 정보
o.subtotal,
o.tax_amount,
o.shipping_cost,
o.discount_amount,
o.total_amount,
o.total_amount - o.discount_amount as net_amount,
-- 할인 정보
o.discount_code,
CASE WHEN o.discount_amount > 0 THEN true ELSE false END as has_discount,
SAFE_DIVIDE(o.discount_amount, o.subtotal) as discount_percentage,
-- 배송 정보
o.shipping_address,
o.shipping_state,
-- 주문 상품 집계
oi.total_items,
oi.total_quantity,
oi.avg_item_price,
-- 고객 정보 (스냅샷)
c.customer_name,
c.customer_email,
c.customer_age_group,
c.customer_state,
c.customer_registration_date,
-- 계산된 메트릭
DATE_DIFF(o.order_date, c.customer_registration_date, DAY) as days_since_registration,
CASE
WHEN DATE_DIFF(o.order_date, c.customer_registration_date, DAY) <= 30 THEN 'NEW_CUSTOMER'
WHEN DATE_DIFF(o.order_date, c.customer_registration_date, DAY) <= 365 THEN 'RETURNING_CUSTOMER'
ELSE 'LOYAL_CUSTOMER'
END as customer_segment,
-- 메타데이터
o.processed_at
FROM ${ref("stg_ecommerce_orders")} o
-- 주문 상품 집계 조인
LEFT JOIN (
SELECT
order_id,
COUNT(DISTINCT product_id) as total_items,
SUM(quantity) as total_quantity,
AVG(unit_price) as avg_item_price
FROM ${ref("stg_order_items")}
GROUP BY order_id
) oi ON o.order_id = oi.order_id
-- 고객 정보 조인 (현재 활성 레코드)
LEFT JOIN (
SELECT
customer_id,
full_name as customer_name,
email as customer_email,
age_group as customer_age_group,
state as customer_state,
registration_date as customer_registration_date
FROM ${ref("dim_customers")}
WHERE is_active = true
) c ON o.customer_id = c.customer_id
${ when(incremental(), `
WHERE o.order_timestamp > (
SELECT COALESCE(MAX(order_timestamp), TIMESTAMP('1900-01-01'))
FROM ${self()}
)
`) }
2. 마케팅 성과 분석
마케팅 캠페인 성과 마트
-- definitions/marts/marketing_campaign_performance.sqlx
config {
type: "table",
description: "마케팅 캠페인별 성과 분석",
bigquery: {
partitionBy: "DATE(campaign_date)",
clusterBy: ["campaign_id", "channel"]
}
}
WITH campaign_metrics AS (
SELECT
c.campaign_id,
c.campaign_name,
c.channel,
c.campaign_type,
DATE(c.start_date) as campaign_date,
c.budget_amount,
-- 광고 지출
SUM(ad.spend_amount) as total_spend,
SUM(ad.impressions) as total_impressions,
SUM(ad.clicks) as total_clicks,
SAFE_DIVIDE(SUM(ad.clicks), SUM(ad.impressions)) as ctr,
SAFE_DIVIDE(SUM(ad.spend_amount), SUM(ad.clicks)) as cpc,
-- 전환 지표
COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END) as converted_customers,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(o.total_amount) as total_revenue,
-- 계산된 지표
SAFE_DIVIDE(COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END),
COUNT(DISTINCT c.customer_id)) as conversion_rate,
SAFE_DIVIDE(SUM(o.total_amount), SUM(ad.spend_amount)) as roas,
SAFE_DIVIDE(SUM(ad.spend_amount), COUNT(DISTINCT CASE WHEN o.order_id IS NOT NULL THEN c.customer_id END)) as cac
FROM ${resolve(dataform.projectConfig.vars.source_dataset + '.campaigns')} c
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.ad_spend')} ad
ON c.campaign_id = ad.campaign_id
LEFT JOIN ${resolve(dataform.projectConfig.vars.source_dataset + '.campaign_customers')} cc
ON c.campaign_id = cc.campaign_id
LEFT JOIN ${ref("fact_orders")} o
ON cc.customer_id = o.customer_id
AND DATE(o.order_date) BETWEEN DATE(c.start_date) AND DATE(c.end_date)
GROUP BY 1, 2, 3, 4, 5, 6
)
SELECT
*,
-- 성과 등급
CASE
WHEN roas >= 4.0 THEN 'EXCELLENT'
WHEN roas >= 2.0 THEN 'GOOD'
WHEN roas >= 1.0 THEN 'BREAK_EVEN'
ELSE 'POOR'
END as performance_grade,
-- 예산 효율성
SAFE_DIVIDE(total_spend, budget_amount) as budget_utilization,
budget_amount - total_spend as remaining_budget
FROM campaign_metrics
3. 고객 세분화 및 LTV 분석
고객 세분화 모델
-- definitions/marts/customer_segmentation.sqlx
config {
type: "table",
description: "RFM 기반 고객 세분화",
bigquery: {
clusterBy: ["segment", "customer_id"]
}
}
WITH customer_rfm AS (
SELECT
c.customer_id,
c.customer_name,
c.registration_date,
-- Recency: 마지막 구매 이후 일수
DATE_DIFF(CURRENT_DATE(), MAX(o.order_date), DAY) as recency_days,
-- Frequency: 총 주문 횟수
COUNT(DISTINCT o.order_id) as frequency,
-- Monetary: 총 구매 금액
SUM(o.total_amount) as monetary,
-- 추가 메트릭
AVG(o.total_amount) as avg_order_value,
MIN(o.order_date) as first_order_date,
MAX(o.order_date) as last_order_date,
DATE_DIFF(MAX(o.order_date), MIN(o.order_date), DAY) + 1 as customer_lifetime_days
FROM ${ref("dim_customers")} c
LEFT JOIN ${ref("fact_orders")} o
ON c.customer_id = o.customer_id
AND o.order_status = 'COMPLETED'
WHERE c.is_active = true
GROUP BY 1, 2, 3
),
rfm_scores AS (
SELECT
*,
-- RFM 점수 계산 (1-5 척도)
CASE
WHEN recency_days <= 30 THEN 5
WHEN recency_days <= 90 THEN 4
WHEN recency_days <= 180 THEN 3
WHEN recency_days <= 365 THEN 2
ELSE 1
END as r_score,
CASE
WHEN frequency >= 10 THEN 5
WHEN frequency >= 5 THEN 4
WHEN frequency >= 3 THEN 3
WHEN frequency >= 2 THEN 2
ELSE 1
END as f_score,
CASE
WHEN monetary >= 1000 THEN 5
WHEN monetary >= 500 THEN 4
WHEN monetary >= 250 THEN 3
WHEN monetary >= 100 THEN 2
ELSE 1
END as m_score
FROM customer_rfm
)
SELECT
customer_id,
customer_name,
registration_date,
recency_days,
frequency,
monetary,
avg_order_value,
first_order_date,
last_order_date,
customer_lifetime_days,
-- RFM 점수
r_score,
f_score,
m_score,
CONCAT(CAST(r_score AS STRING), CAST(f_score AS STRING), CAST(m_score AS STRING)) as rfm_code,
-- 세분화 결과
CASE
WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'CHAMPIONS'
WHEN r_score >= 3 AND f_score >= 4 AND m_score >= 4 THEN 'LOYAL_CUSTOMERS'
WHEN r_score >= 4 AND f_score <= 2 AND m_score >= 3 THEN 'POTENTIAL_LOYALISTS'
WHEN r_score >= 4 AND f_score <= 2 AND m_score <= 2 THEN 'NEW_CUSTOMERS'
WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'PROMISING'
WHEN r_score >= 3 AND f_score <= 2 AND m_score <= 2 THEN 'NEED_ATTENTION'
WHEN r_score <= 2 AND f_score >= 3 AND m_score >= 3 THEN 'AT_RISK'
WHEN r_score <= 2 AND f_score >= 4 AND m_score >= 4 THEN 'CANT_LOSE_THEM'
WHEN r_score <= 2 AND f_score <= 2 AND m_score >= 3 THEN 'HIBERNATING'
ELSE 'LOST'
END as segment,
-- 예상 생애가치 (간단한 모델)
CASE
WHEN customer_lifetime_days > 0
THEN (monetary / customer_lifetime_days) * 365 * 2 -- 향후 2년 예상
ELSE avg_order_value * 4 -- 신규 고객 추정
END as predicted_clv,
CURRENT_TIMESTAMP() as processed_at
FROM rfm_scores
베스트 프랙티스
1. 코드 조직화
폴더 구조 모범 사례
definitions/
├── sources/ # 소스 데이터 문서화
│ ├── raw_orders.sql
│ └── raw_customers.sql
├── staging/ # 원시 데이터 정제
│ ├── ecommerce/
│ │ ├── stg_orders.sqlx
│ │ └── stg_customers.sqlx
│ └── marketing/
│ ├── stg_campaigns.sqlx
│ └── stg_ad_spend.sqlx
├── intermediate/ # 중간 변환 단계
│ ├── int_order_enriched.sqlx
│ └── int_customer_metrics.sqlx
├── marts/ # 최종 비즈니스 테이블
│ ├── core/ # 핵심 차원/팩트 테이블
│ │ ├── dim_customers.sqlx
│ │ ├── dim_products.sqlx
│ │ └── fact_orders.sqlx
│ ├── marketing/ # 마케팅 분석용
│ │ ├── customer_segmentation.sqlx
│ │ └── campaign_performance.sqlx
│ └── finance/ # 재무 분석용
│ ├── revenue_analysis.sqlx
│ └── cost_analysis.sqlx
├── assertions/ # 데이터 품질 검증
│ ├── core/
│ │ ├── assert_unique_orders.sqlx
│ │ └── assert_positive_amounts.sqlx
│ └── business_rules/
│ ├── assert_order_logic.sqlx
│ └── assert_customer_consistency.sqlx
└── utils/ # 유틸리티 뷰
├── date_spine.sqlx
└── business_calendar.sqlx
명명 규칙 표준화
-- 파일명 규칙
-- [layer]_[business_area]_[entity].sqlx
-- staging: stg_[source]_[entity].sqlx
-- definitions/staging/ecommerce/stg_shopify_orders.sqlx
-- intermediate: int_[description].sqlx
-- definitions/intermediate/int_orders_with_customers.sqlx
-- marts: [entity_type]_[entity].sqlx
-- definitions/marts/core/dim_customers.sqlx
-- definitions/marts/core/fact_orders.sqlx
-- assertions: assert_[rule_description].sqlx
-- definitions/assertions/core/assert_unique_customer_ids.sqlx
2. 성능 최적화
파티셔닝 전략
-- 날짜 기반 파티셔닝 (가장 일반적)
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(created_date)",
requirePartitionFilter: true,
partitionExpirationDays: 1095 // 3년
}
}
-- 정수 범위 파티셔닝
config {
type: "table",
bigquery: {
partitionBy: "RANGE_BUCKET(customer_id, GENERATE_ARRAY(0, 1000000, 1000))"
}
}
클러스터링 최적화
-- 쿼리 패턴에 맞는 클러스터링
config {
type: "table",
bigquery: {
partitionBy: "DATE(order_date)",
clusterBy: ["customer_id", "product_category", "region"] // 자주 필터링/조인되는 컬럼 순서로
}
}
증분 처리 최적화
-- 효율적인 증분 처리 패턴
config {
type: "incremental",
uniqueKey: ["order_id"],
bigquery: {
partitionBy: "DATE(order_date)",
updatePartitionFilter: "DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)" // 최근 3일만 업데이트
}
}
SELECT *
FROM ${ref("source_table")}
${ when(incremental(), `
WHERE
updated_at > (SELECT MAX(updated_at) FROM ${self()})
OR order_id IN (
SELECT order_id FROM ${ref("updated_orders")} -- 특정 업데이트 목록
)
`) }
3. 데이터 품질 관리
포괄적인 데이터 검증
-- definitions/assertions/comprehensive_order_validation.sqlx
config {
type: "assertion",
description: "주문 데이터 종합 품질 검증"
}
-- 1. 필수 필드 누락 검사
SELECT 'missing_required_fields' as check_type, order_id
FROM ${ref("fact_orders")}
WHERE customer_id IS NULL OR order_date IS NULL OR total_amount IS NULL
UNION ALL
-- 2. 비즈니스 룰 검증
SELECT 'negative_amounts' as check_type, order_id
FROM ${ref("fact_orders")}
WHERE total_amount < 0 OR subtotal < 0
UNION ALL
-- 3. 참조 무결성 검증
SELECT 'orphaned_orders' as check_type, o.order_id
FROM ${ref("fact_orders")} o
LEFT JOIN ${ref("dim_customers")} c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
UNION ALL
-- 4. 중복 데이터 검사
SELECT 'duplicate_orders' as check_type, order_id
FROM ${ref("fact_orders")}
GROUP BY order_id
HAVING COUNT(*) > 1
UNION ALL
-- 5. 데이터 분포 이상 감지
SELECT 'suspicious_outliers' as check_type, order_id
FROM ${ref("fact_orders")}
WHERE total_amount > (
SELECT PERCENTILE_CONT(total_amount, 0.99) OVER()
FROM ${ref("fact_orders")}
LIMIT 1
) * 10 -- 99%ile의 10배를 초과하는 경우
데이터 품질 메트릭 대시보드
-- definitions/monitoring/data_quality_dashboard.sqlx
config {
type: "table",
description: "일일 데이터 품질 메트릭"
}
WITH quality_checks AS (
SELECT
'customers' as table_name,
CURRENT_DATE() as check_date,
COUNT(*) as total_records,
COUNTIF(customer_id IS NULL) as null_ids,
COUNTIF(email IS NULL OR email = '') as missing_emails,
COUNTIF(NOT REGEXP_CONTAINS(email, r'^[^@]+@[^@]+\.[^@]+$')) as invalid_emails,
COUNT(DISTINCT customer_id) as unique_customers
FROM ${ref("dim_customers")}
WHERE is_active = true
UNION ALL
SELECT
'orders',
CURRENT_DATE(),
COUNT(*),
COUNTIF(order_id IS NULL),
COUNTIF(customer_id IS NULL),
COUNTIF(total_amount IS NULL),
COUNT(DISTINCT order_id)
FROM ${ref("fact_orders")}
WHERE order_date = CURRENT_DATE()
)
SELECT
*,
-- 품질 점수 계산
1 - (null_ids + missing_emails + invalid_emails) / total_records as quality_score,
-- 경고 플래그
CASE
WHEN (null_ids + missing_emails + invalid_emails) / total_records > 0.05 THEN 'CRITICAL'
WHEN (null_ids + missing_emails + invalid_emails) / total_records > 0.01 THEN 'WARNING'
ELSE 'OK'
END as quality_status
FROM quality_checks
4. 문서화 및 메타데이터
자세한 테이블 문서화
-- definitions/marts/core/dim_customers.sqlx
config {
type: "table",
description: "고객 차원 테이블 - 모든 고객의 마스터 데이터를 포함합니다. SCD Type 2로 구현되어 고객 정보 변경 이력을 추적합니다.",
columns: {
customer_id: "고유 고객 식별자 (Primary Key)",
customer_name: "고객 성명",
email: "이메일 주소 (고유값, 마케팅 커뮤니케이션에 사용)",
phone: "전화번호",
date_of_birth: "생년월일",
age: "현재 나이 (매일 계산됨)",
age_group: "연령대 구분 (18-24, 25-34, 35-44, 45-54, 55+)",
registration_date: "고객 등록일",
total_orders: "총 주문 횟수",
total_spent: "총 구매 금액 (USD)",
avg_order_value: "평균 주문 금액",
last_order_date: "최근 주문일",
customer_segment: "고객 세그먼트 (NEW, RETURNING, LOYAL)",
is_active: "현재 활성 레코드 여부 (SCD Type 2)",
effective_date: "레코드 유효 시작일",
end_date: "레코드 유효 종료일 (2999-12-31이면 현재 활성)",
processed_at: "데이터 처리 타임스탬프"
},
bigquery: {
partitionBy: "DATE(effective_date)",
clusterBy: ["customer_id", "is_active"],
labels: {
team: "data",
domain: "customer",
criticality: "high"
}
}
}
비즈니스 로직 문서화
// includes/business_rules.js
/**
* 고객 세분화 로직
*
* 비즈니스 규칙:
* - NEW: 첫 구매 후 30일 이내
* - RETURNING: 첫 구매 후 30일 초과, 1년 이내
* - LOYAL: 첫 구매 후 1년 초과
* - CHURNED: 최근 구매가 90일 이전
*
* @param {string} registrationDate - 고객 등록일 컬럼명
* @param {string} lastOrderDate - 최근 주문일 컬럼명
* @returns {string} CASE문을 포함한 SQL 표현식
*/
function customerSegmentation(registrationDate, lastOrderDate) {
return `
CASE
WHEN DATE_DIFF(CURRENT_DATE(), ${lastOrderDate}, DAY) > 90 THEN 'CHURNED'
WHEN DATE_DIFF(CURRENT_DATE(), ${registrationDate}, DAY) <= 30 THEN 'NEW'
WHEN DATE_DIFF(CURRENT_DATE(), ${registrationDate}, DAY) <= 365 THEN 'RETURNING'
ELSE 'LOYAL'
END
`;
}
module.exports = { customerSegmentation };
사용 요금 및 비용 최적화
1. DataForm 요금 체계
DataForm 서비스 요금
Google Cloud DataForm 자체는 완전 무료로 제공됩니다:
- ✅ DataForm 서비스 이용료: 무료
- ✅ 워크스페이스 생성 및 관리: 무료
- ✅ Git 저장소 연동: 무료
- ✅ 워크플로우 실행 관리: 무료
- ✅ 웹 IDE 사용: 무료
실제 발생 비용
DataForm을 통해 실행되는 쿼리는 BigQuery 요금 체계를 따릅니다:
# 비용 발생 요소
BigQuery 요금:
- 쿼리 처리 비용 (온디맨드 또는 슬롯 기반)
- 스토리지 비용
- 네트워크 비용 (리전 간 이동 시)
DataForm 관련:
- 워크플로우 실행 시 BigQuery 쿼리 비용
- 테스트 실행 시 BigQuery 비용
- 어서션 검증 시 BigQuery 비용
2. BigQuery 요금 체계
2.1 쿼리 처리 비용
온디맨드 가격 (2024년 기준):
- 미국 (멀티리전): $6.00 per TB 스캔된 데이터
- 기타 지역: $6.60 per TB 스캔된 데이터
-- 예상 비용 계산 예제
-- definitions/monitoring/query_cost_estimation.sqlx
config { type: "table" }
WITH query_analysis AS (
SELECT
job_id,
user_email,
creation_time,
total_bytes_processed / 1024 / 1024 / 1024 / 1024 as tb_processed,
-- 미국 기준 $6.00/TB
(total_bytes_processed / 1024 / 1024 / 1024 / 1024) * 6.0 as estimated_cost_usd,
query
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE
DATE(creation_time) = CURRENT_DATE()
AND job_type = 'QUERY'
AND state = 'DONE'
AND CONTAINS_SUBSTR(query, 'dataform')
)
SELECT
DATE(creation_time) as query_date,
COUNT(*) as total_dataform_queries,
ROUND(SUM(tb_processed), 3) as total_tb_processed,
ROUND(SUM(estimated_cost_usd), 2) as estimated_daily_cost_usd
FROM query_analysis
GROUP BY DATE(creation_time)
ORDER BY query_date DESC
슬롯 기반 가격 (예약 용량):
- Flex Slots: $0.04 per slot per hour
- 연간 약정: $1,700 per 100 slots per month
- 월간 약정: $2,000 per 100 slots per month
2.2 스토리지 비용
- Active Storage: $0.020 per GB per month
- Long-term Storage (90일 미사용): $0.010 per GB per month
-- 스토리지 비용 모니터링
-- definitions/monitoring/storage_cost_analysis.sqlx
config { type: "table" }
SELECT
table_schema,
table_name,
ROUND(size_bytes / 1024 / 1024 / 1024, 2) as size_gb,
-- Active storage 기준 $0.020/GB/month
ROUND((size_bytes / 1024 / 1024 / 1024) * 0.020, 2) as monthly_storage_cost_usd,
last_modified_time,
CASE
WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), last_modified_time, DAY) > 90
THEN ROUND((size_bytes / 1024 / 1024 / 1024) * 0.010, 2)
ELSE ROUND((size_bytes / 1024 / 1024 / 1024) * 0.020, 2)
END as actual_monthly_cost_usd
FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
WHERE table_schema NOT IN ('INFORMATION_SCHEMA', 'sys')
ORDER BY size_gb DESC
3. DataForm 비용 최적화 전략
3.1 쿼리 최적화를 통한 비용 절감
파티셔닝 활용:
-- 파티션 필터를 통한 스캔 데이터 최소화
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(order_date)",
requirePartitionFilter: true
}
}
SELECT *
FROM ${ref("fact_orders")}
WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) -- 파티션 필터 필수
증분 처리를 통한 비용 절감:
-- 전체 재처리 대신 증분 처리로 비용 90% 절약 가능
config {
type: "incremental",
uniqueKey: ["order_id"]
}
SELECT *
FROM ${ref("raw_orders")}
${ when(incremental(), `
WHERE updated_at > (SELECT MAX(updated_at) FROM ${self()})
`) }
컬럼 선택을 통한 스캔 최소화:
-- ❌ 비효율적 - 전체 컬럼 스캔
SELECT * FROM large_table
-- ✅ 효율적 - 필요한 컬럼만 선택
SELECT
order_id,
customer_id,
total_amount,
order_date
FROM large_table
3.2 테스트 비용 최적화
-- 테스트용 샘플 데이터 사용으로 비용 절약
-- definitions/staging/stg_orders_sample.sqlx
config {
type: "table",
disabled: dataform.projectConfig.vars.environment !== 'test'
}
SELECT *
FROM ${ref("raw_orders")}
WHERE MOD(ABS(FARM_FINGERPRINT(order_id)), 100) < 1 -- 1% 샘플링
3.3 스케줄링을 통한 효율적 실행
// includes/cost_optimization.js
// 업무 시간 외 실행으로 슬롯 경쟁 최소화
function isOffPeakHours() {
const currentHour = new Date().getHours();
return currentHour < 8 || currentHour > 18;
}
// 배치 처리로 쿼리 최적화
function shouldRunFullRefresh(tableName) {
const dayOfWeek = new Date().getDay();
// 주말에만 전체 갱신 실행
return dayOfWeek === 0 || dayOfWeek === 6;
}
module.exports = { isOffPeakHours, shouldRunFullRefresh };
4. 비용 모니터링 및 알림
4.1 실시간 비용 추적
-- definitions/monitoring/daily_cost_tracking.sqlx
config {
type: "incremental",
uniqueKey: ["cost_date", "project_id"]
}
WITH daily_costs AS (
SELECT
DATE(usage_start_time) as cost_date,
project.id as project_id,
service.description as service_name,
SUM(cost) as total_cost_usd,
SUM(CASE WHEN sku.description LIKE '%Query%' THEN cost ELSE 0 END) as query_cost_usd,
SUM(CASE WHEN sku.description LIKE '%Storage%' THEN cost ELSE 0 END) as storage_cost_usd
FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
WHERE
service.description = 'BigQuery'
AND DATE(usage_start_time) >= CURRENT_DATE() - 30
GROUP BY 1, 2, 3
)
SELECT
*,
-- 전일 대비 증가율 계산
LAG(total_cost_usd) OVER (
PARTITION BY project_id
ORDER BY cost_date
) as previous_day_cost,
SAFE_DIVIDE(
total_cost_usd - LAG(total_cost_usd) OVER (PARTITION BY project_id ORDER BY cost_date),
LAG(total_cost_usd) OVER (PARTITION BY project_id ORDER BY cost_date)
) * 100 as cost_change_pct
FROM daily_costs
${ when(incremental(), `
WHERE cost_date > (SELECT MAX(cost_date) FROM ${self()})
`) }
4.2 비용 알림 설정
-- definitions/assertions/cost_alert_thresholds.sqlx
config {
type: "assertion",
description: "일일 비용 임계값 초과 시 알림"
}
WITH cost_summary AS (
SELECT
DATE(usage_start_time) as cost_date,
SUM(cost) as daily_cost_usd
FROM `project.dataset.gcp_billing_export_v1_BILLING_ACCOUNT_ID`
WHERE
service.description = 'BigQuery'
AND DATE(usage_start_time) = CURRENT_DATE()
GROUP BY 1
)
SELECT
cost_date,
daily_cost_usd,
'DAILY_COST_THRESHOLD_EXCEEDED' as alert_type
FROM cost_summary
WHERE daily_cost_usd > 100 -- $100 임계값 설정
5. 비용 최적화 베스트 프랙티스
5.1 개발 환경 비용 최적화
// .df/profiles.json - 환경별 설정
{
"dev": {
"projectId": "my-project-dev",
"location": "US",
"vars": {
"sample_percentage": 0.01,
"enable_clustering": false,
"max_rows_for_preview": 1000
}
},
"prod": {
"projectId": "my-project-prod",
"location": "US",
"vars": {
"sample_percentage": 1.0,
"enable_clustering": true,
"max_rows_for_preview": 10000
}
}
}
-- 환경별 데이터 샘플링
-- definitions/staging/stg_orders_optimized.sqlx
config {
type: "table",
disabled: dataform.projectConfig.vars.environment === 'dev'
&& dataform.projectConfig.vars.sample_percentage < 1.0
}
SELECT *
FROM ${ref("raw_orders")}
${ when(dataform.projectConfig.vars.environment === 'dev', `
WHERE MOD(ABS(FARM_FINGERPRINT(CAST(order_id AS STRING))), 100)
< ${dataform.projectConfig.vars.sample_percentage * 100}
`) }
5.2 쿼리 성능 모니터링
-- definitions/monitoring/expensive_queries_analysis.sqlx
config {
type: "table",
description: "비용이 높은 쿼리 분석"
}
SELECT
job_id,
user_email,
creation_time,
ROUND(total_bytes_processed / 1024 / 1024 / 1024, 2) as gb_processed,
ROUND(total_slot_ms / 1000 / 60, 2) as slot_minutes,
ROUND((total_bytes_processed / 1024 / 1024 / 1024 / 1024) * 6.0, 2) as estimated_cost_usd,
-- 쿼리 패턴 분석
CASE
WHEN CONTAINS_SUBSTR(query, 'SELECT *') THEN 'FULL_TABLE_SCAN'
WHEN CONTAINS_SUBSTR(query, 'GROUP BY') AND NOT CONTAINS_SUBSTR(query, 'LIMIT') THEN 'LARGE_AGGREGATION'
WHEN CONTAINS_SUBSTR(query, 'JOIN') AND NOT CONTAINS_SUBSTR(query, 'WHERE') THEN 'CARTESIAN_JOIN_RISK'
ELSE 'OPTIMIZED'
END as query_pattern,
LEFT(query, 200) as query_preview
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE
DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
AND total_bytes_processed > 10 * 1024 * 1024 * 1024 -- 10GB 이상만
ORDER BY estimated_cost_usd DESC
LIMIT 50
6. 예상 월간 비용 계산기
-- definitions/monitoring/monthly_cost_forecast.sqlx
config {
type: "table",
description: "월간 예상 비용 계산"
}
WITH usage_trends AS (
SELECT
DATE_TRUNC(DATE(creation_time), MONTH) as month,
COUNT(*) as total_queries,
SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 as total_tb_processed,
SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 * 6.0 as total_query_cost_usd
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE
DATE(creation_time) >= DATE_SUB(DATE_TRUNC(CURRENT_DATE(), MONTH), INTERVAL 3 MONTH)
AND job_type = 'QUERY'
AND state = 'DONE'
GROUP BY 1
),
storage_costs AS (
SELECT
DATE_TRUNC(CURRENT_DATE(), MONTH) as month,
SUM(size_bytes) / 1024 / 1024 / 1024 as total_storage_gb,
SUM(size_bytes) / 1024 / 1024 / 1024 * 0.020 as monthly_storage_cost_usd
FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
)
SELECT
u.month,
u.total_queries,
ROUND(u.total_tb_processed, 3) as tb_processed,
ROUND(u.total_query_cost_usd, 2) as query_cost_usd,
ROUND(s.monthly_storage_cost_usd, 2) as storage_cost_usd,
ROUND(u.total_query_cost_usd + COALESCE(s.monthly_storage_cost_usd, 0), 2) as total_monthly_cost_usd,
-- 다음 달 예상 비용 (최근 3개월 평균 기준)
ROUND(
AVG(u.total_query_cost_usd) OVER (
ORDER BY u.month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) + COALESCE(s.monthly_storage_cost_usd, 0),
2
) as forecasted_next_month_usd
FROM usage_trends u
LEFT JOIN storage_costs s ON u.month = s.month
ORDER BY u.month DESC
7. 비용 절약 체크리스트
✅ 쿼리 최적화
- 파티션 필터 사용 (
WHERE DATE(column) >= '2024-01-01') - 필요한 컬럼만 선택 (SELECT * 지양)
- 증분 처리 활용 (
type: "incremental") - 클러스터링으로 쿼리 성능 향상
- 적절한 데이터 타입 사용
✅ 테이블 최적화
- 파티션 만료 정책 설정 (
partitionExpirationDays) - 불필요한 테이블 정리
- 중복 데이터 제거
- 압축률 높은 데이터 타입 사용
✅ 개발 프로세스
- 개발 환경에서 데이터 샘플링 사용
- 테스트 쿼리 최소화
- 비용 모니터링 대시보드 구축
- 정기적인 비용 리뷰 실시
✅ 운영 최적화
- 배치 작업은 off-peak 시간에 실행
- 슬롯 예약 구매 검토 (대용량 워크로드 시)
- 리전 간 데이터 이동 최소화
- 자동화된 비용 알림 설정
트러블슈팅
1. 일반적인 오류 및 해결책
1.1 순환 의존성 오류
Error: Circular dependency detected: table_a -> table_b -> table_a
해결책:
// includes/temp_tables.js
function createTempTable(tableName, query) {
return `CREATE OR REPLACE TABLE ${tableName}_temp AS (${query})`;
}
// 순환 의존성을 끊는 임시 테이블 사용
config {
type: "table",
preOps: [
createTempTable(self(), `SELECT customer_id, COUNT(*) as order_count FROM raw_orders GROUP BY 1`)
]
}
1.2 메모리 부족 오류
Error: Resources exceeded during query execution
해결책:
-- 배치 처리로 대용량 데이터 처리
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(order_date)",
updatePartitionFilter: "DATE(order_date) = CURRENT_DATE()" // 하루씩 처리
}
}
-- 또는 WITH문으로 중간 결과 구체화
WITH large_aggregation AS (
SELECT
customer_id,
SUM(amount) as total_amount
FROM ${ref("fact_orders")}
WHERE DATE(order_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY customer_id
)
SELECT * FROM large_aggregation
1.3 파티션 필터 누락 오류
Error: Cannot query over table without partition filter
해결책:
-- 파티션 필터를 항상 포함
SELECT *
FROM ${ref("partitioned_table")}
WHERE DATE(created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) -- 필수 파티션 필터
-- 또는 파티션 요구사항 제거
config {
bigquery: {
partitionBy: "DATE(created_at)",
requirePartitionFilter: false
}
}
2. 성능 문제 해결
2.1 슬롯 부족 문제
-- 쿼리 우선순위 조정
config {
type: "table",
bigquery: {
jobPriority: "INTERACTIVE", -- 또는 "BATCH"
maximumBytesBilled: 10000000000 -- 10GB로 제한
}
}
2.2 증분 처리 성능 최적화
-- 효율적인 증분 처리를 위한 워터마크 테이블
-- definitions/utils/processing_watermarks.sqlx
config { type: "table" }
SELECT
'orders' as table_name,
MAX(updated_at) as last_processed_timestamp,
CURRENT_TIMESTAMP() as watermark_updated_at
FROM ${ref("stg_orders")}
-- 워터마크 기반 증분 처리
${ when(incremental(), `
WHERE updated_at > (
SELECT last_processed_timestamp
FROM ${ref("processing_watermarks")}
WHERE table_name = 'orders'
)
`) }
3. 데이터 일관성 문제
3.1 타임존 관련 문제
-- 일관된 타임존 처리
config {
vars: {
default_timezone: "America/New_York"
}
}
SELECT
order_id,
-- 모든 타임스탬프를 동일한 타임존으로 변환
DATETIME(TIMESTAMP(order_timestamp), "${dataform.projectConfig.vars.default_timezone}") as order_datetime_local,
DATE(TIMESTAMP(order_timestamp), "${dataform.projectConfig.vars.default_timezone}") as order_date_local
FROM ${ref("raw_orders")}
3.2 데이터 타입 불일치
-- 안전한 타입 변환 함수
-- includes/type_conversion.js
function safeCast(column, targetType, defaultValue = 'NULL') {
return `
CASE
WHEN ${column} IS NULL THEN ${defaultValue}
WHEN SAFE_CAST(${column} AS ${targetType}) IS NULL THEN ${defaultValue}
ELSE SAFE_CAST(${column} AS ${targetType})
END
`;
}
function safeNumeric(column, defaultValue = '0') {
return safeCast(column, 'NUMERIC', defaultValue);
}
module.exports = { safeCast, safeNumeric };
4. 모니터링 및 알림
4.1 실패 감지 및 알림
-- definitions/monitoring/workflow_health_check.sqlx
config {
type: "table",
description: "워크플로우 상태 모니터링"
}
WITH recent_runs AS (
SELECT
workflow_id,
execution_time,
status,
error_message,
ROW_NUMBER() OVER (PARTITION BY workflow_id ORDER BY execution_time DESC) as rn
FROM `project.dataset.dataform_execution_logs`
WHERE DATE(execution_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
)
SELECT
workflow_id,
execution_time as last_run_time,
status,
error_message,
CASE
WHEN status = 'FAILED' THEN 'CRITICAL'
WHEN status = 'CANCELLED' THEN 'WARNING'
WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), execution_time, HOUR) > 24 THEN 'STALE'
ELSE 'OK'
END as health_status
FROM recent_runs
WHERE rn = 1
4.2 데이터 신선도 모니터링
-- definitions/monitoring/data_freshness.sqlx
config { type: "table" }
SELECT
'fact_orders' as table_name,
MAX(order_date) as latest_data_date,
CURRENT_DATE() as check_date,
DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) as days_stale,
CASE
WHEN DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) > 2 THEN 'STALE'
WHEN DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) > 1 THEN 'WARNING'
ELSE 'FRESH'
END as freshness_status
FROM ${ref("fact_orders")}
UNION ALL
SELECT
'dim_customers',
MAX(DATE(effective_date)),
CURRENT_DATE(),
DATE_DIFF(CURRENT_DATE(), MAX(DATE(effective_date)), DAY),
CASE
WHEN DATE_DIFF(CURRENT_DATE(), MAX(DATE(effective_date)), DAY) > 1 THEN 'STALE'
ELSE 'FRESH'
END
FROM ${ref("dim_customers")}
5. 비용 최적화
5.1 쿼리 비용 모니터링
-- definitions/monitoring/query_cost_analysis.sqlx
config { type: "table" }
SELECT
DATE(creation_time) as query_date,
user_email,
job_type,
COUNT(*) as query_count,
SUM(total_bytes_processed) / 1024 / 1024 / 1024 as total_gb_processed,
AVG(total_bytes_processed) / 1024 / 1024 / 1024 as avg_gb_per_query,
-- BigQuery 온디맨드 가격: $5 per TB
SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024 * 5 as estimated_cost_usd
FROM `region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE
DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
AND total_bytes_processed > 0
GROUP BY 1, 2, 3
ORDER BY estimated_cost_usd DESC
5.2 테이블 크기 최적화
-- definitions/monitoring/table_storage_analysis.sqlx
config { type: "table" }
SELECT
table_schema,
table_name,
ROUND(size_bytes / 1024 / 1024 / 1024, 2) as size_gb,
row_count,
partitioning_type,
clustering_fields,
ROUND(size_bytes / row_count / 1024, 2) as avg_row_size_kb,
-- 파티션 정리 추천
CASE
WHEN partitioning_type LIKE '%DAY%' AND size_gb > 100 THEN 'CONSIDER_PARTITION_EXPIRATION'
WHEN clustering_fields IS NULL AND size_gb > 50 THEN 'CONSIDER_CLUSTERING'
WHEN row_count < 1000000 AND size_gb < 1 THEN 'TOO_SMALL_FOR_PARTITIONING'
ELSE 'OPTIMIZED'
END as optimization_recommendation
FROM `region-us.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
WHERE table_schema NOT IN ('INFORMATION_SCHEMA', 'sys')
ORDER BY size_gb DESC
추가 리소스
공식 문서
커뮤니티 리소스
모범 사례 가이드
이 문서는 BigQuery Dataform의 종합적인 활용 가이드입니다. 프로젝트 요구사항에 맞게 예제들을 참조하여 실제 데이터 파이프라인을 구축해보세요.