스케줄된 쿼리
쿼리 자동화
BigQuery에서 스케줄된 쿼리를 활용한 자동화된 데이터 파이프라인 구축 방법을 다루는 종합 가이드입니다.
목차
- 스케줄된 쿼리 개요
- 스케줄링 설정
- 2.1 기본 스케줄 패턴
- 2.2 Cron 표현식 사용
- 2.3 웹 UI를 통한 스케줄 설정
- 2.4 CLI를 통한 고급 스케줄 설정
- 2.5 스케줄 수정 및 관리
- 2.6 동적 스케줄 관리
- 2.7 스케줄 템플릿 활용
- 데이터 파이프라인 구축
- 증분 처리 전략
- 오류 처리 및 재시도
- 모니터링 및 알림
- 성능 최적화
- 실제 활용 사례
- 모범 사례
- 9.1 스케줄 설계 원칙
- 9.2 코드 구조화 및 모듈화
- 9.3 테스트 및 검증
- 9.4 문서화 및 메타데이터 관리
- 9.5 운영 가이드라인
1. 스케줄된 쿼리 개요
1.1 스케줄된 쿼리란?
스케줄된 쿼리(Scheduled Query)는 지정된 시간에 자동으로 실행되는 BigQuery 쿼리입니다. ETL 파이프라인, 데이터 집계, 보고서 생성 등을 자동화할 수 있습니다.
1.2 주요 특징
- 자동 실행: 정해진 스케줄에 따라 자동 실행
- 결과 저장: 쿼리 결과를 테이블에 자동 저장
- 재시도 메커니즘: 실패 시 자동 재시도
- 알림 기능: 실행 결과 알림 발송
- 버전 관리: 쿼리 변경 이력 추적
1.3 기본 워크플로우
# gcloud CLI를 사용한 스케줄된 쿼리 생성
bq mk \
--transfer_config \
--project_id=PROJECT_ID \
--target_dataset=DATASET_ID \
--display_name="Daily Sales Summary" \
--data_source=scheduled_query \
--schedule="every 24 hours" \
--params='{
"query": "SELECT DATE(order_timestamp) as date, SUM(amount) as total_sales FROM `project.dataset.orders` WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) GROUP BY date",
"destination_table_name_template": "daily_sales_{run_date}",
"write_disposition": "WRITE_TRUNCATE"
}'
2. 스케줄링 설정
2.1 기본 스케줄 패턴
-- 매일 오전 6시 실행
-- Schedule: "every day 06:00"
-- 매주 월요일 오전 9시 실행
-- Schedule: "every monday 09:00"
-- 매월 1일 오전 8시 실행
-- Schedule: "1 of month 08:00"
-- 매시간 실행
-- Schedule: "every 1 hours"
-- 30분마다 실행
-- Schedule: "every 30 minutes"
-- 특정 시간대 설정
-- Schedule: "every day 14:00"
-- Timezone: "Asia/Seoul"
2.2 Cron 표현식 사용
# 매일 오전 2시 30분 실행
# Cron: "30 2 * * *"
# 평일 오전 9시 실행 (월-금)
# Cron: "0 9 * * 1-5"
# 매달 15일 오후 6시 실행
# Cron: "0 18 15 * *"
# 매 15분마다 실행
# Cron: "*/15 * * * *"
# 매주 일요일 자정 실행
# Cron: "0 0 * * 0"
2.3 웹 UI를 통한 스케줄 설정
BigQuery 웹 콘솔에서 스케줄된 쿼리를 생성하는 단계별 방법:
단계 1: 쿼리 작성 및 스케줄 설정 시작
-- 예시: 일일 매출 집계 쿼리
SELECT
DATE(order_timestamp) as order_date,
COUNT(DISTINCT order_id) as total_orders,
SUM(order_amount) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(order_amount) as avg_order_value
FROM `project.raw.orders`
WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY order_date;
단계 2: 스케줄 옵션 설정
- 쿼리 실행 후 “스케줄된 쿼리 만들기” 클릭
- 기본 정보 입력:
- 쿼리 이름:
daily_sales_summary - 설명:
Daily sales metrics aggregation
- 쿼리 이름:
- 스케줄 설정:
반복 유형: 매일 시작 시간: 02:00 (오전 2시) 시간대: Asia/Seoul 시작 날짜: 오늘 종료 날짜: 설정 안함 (무기한 실행) - 고급 스케줄 옵션:
재시도 설정: 최대 3회 재시도 간격: 10분 실행 시간 초과: 6시간
단계 3: 대상 설정
프로젝트 ID: my-project
데이터셋 ID: analytics
테이블 ID: daily_sales_summary
파티션: order_date 컬럼 기준 (선택사항)
쓰기 기본 설정: 매일 덮어쓰기 (WRITE_TRUNCATE)
단계 4: 알림 설정
이메일 알림: data-team@company.com
실패 시에만 알림: 체크
Pub/Sub 알림: projects/my-project/topics/bq-notifications (선택사항)
2.4 CLI를 통한 고급 스케줄 설정
# 복잡한 스케줄된 쿼리 생성 예시
bq mk \
--transfer_config \
--project_id=my-project \
--target_dataset=analytics \
--display_name="Advanced Daily ETL Pipeline" \
--data_source=scheduled_query \
--schedule="every day 02:00" \
--time_zone="Asia/Seoul" \
--notification_pubsub_topic="projects/my-project/topics/etl-notifications" \
--params='{
"query": "CALL `my-project.etl.daily_processing_pipeline`(@run_date)",
"destination_table_name_template": "daily_summary_{run_date|'%Y%m%d'}",
"write_disposition": "WRITE_TRUNCATE",
"query_parameters": [
{
"name": "run_date",
"parameterType": {"type": "DATE"},
"parameterValue": {"value": "{run_date}"}
}
]
}'
# 파라미터가 있는 쿼리 스케줄링
bq mk \
--transfer_config \
--project_id=my-project \
--target_dataset=reporting \
--display_name="Weekly Report with Parameters" \
--data_source=scheduled_query \
--schedule="every monday 09:00" \
--params='{
"query": "SELECT customer_segment, SUM(revenue) as total_revenue FROM `my-project.analytics.customer_metrics` WHERE report_date BETWEEN @start_date AND @end_date GROUP BY customer_segment",
"destination_table_name_template": "weekly_report_{run_time|'%Y%m%d'}",
"query_parameters": [
{
"name": "start_date",
"parameterType": {"type": "DATE"},
"parameterValue": {"value": "{run_date-7}"}
},
{
"name": "end_date",
"parameterType": {"type": "DATE"},
"parameterValue": {"value": "{run_date-1}"}
}
]
}'
2.5 스케줄 수정 및 관리
기존 스케줄 수정
# 스케줄 시간 변경
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--schedule="every day 01:00"
# 쿼리 내용 수정
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--params='{
"query": "SELECT DATE(created_at) as date, COUNT(*) as count FROM `project.dataset.new_table` WHERE DATE(created_at) = CURRENT_DATE() GROUP BY date"
}'
# 대상 테이블 변경
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--target_dataset=new_dataset
# 알림 설정 변경
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--notification_pubsub_topic=projects/my-project/topics/new-notifications
스케줄 일시 정지 및 재개
# 스케줄 일시 정지
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--no_auto_scheduling
# 스케줄 재개
bq update transfer \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--auto_scheduling
# 즉시 실행 (스케줄과 별개)
bq mk transfer run \
--transfer_config_id=projects/123/locations/us/transferConfigs/456 \
--run_time="2024-01-15T10:00:00Z"
2.6 동적 스케줄 관리
-- 조건부 실행을 위한 스케줄 쿼리
CREATE OR REPLACE PROCEDURE `project.automation.conditional_scheduler`()
BEGIN
DECLARE should_run BOOL DEFAULT FALSE;
DECLARE current_hour INT64;
DECLARE current_day STRING;
DECLARE is_holiday BOOL DEFAULT FALSE;
SET current_hour = EXTRACT(HOUR FROM CURRENT_DATETIME());
SET current_day = FORMAT_DATETIME('%A', CURRENT_DATETIME());
-- 휴일 확인
SET is_holiday = (
SELECT COUNT(*) > 0
FROM `project.reference.holidays`
WHERE holiday_date = CURRENT_DATE()
);
-- 비즈니스 로직에 따른 실행 조건
IF current_day IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday')
AND current_hour BETWEEN 8 AND 18
AND NOT is_holiday THEN
SET should_run = TRUE;
END IF;
-- 조건을 만족할 때만 실제 작업 실행
IF should_run THEN
CALL `project.etl.daily_data_processing`();
-- 실행 로그 기록
INSERT INTO `project.monitoring.scheduler_log` (
execution_date,
procedure_name,
execution_reason,
executed_at
) VALUES (
CURRENT_DATE(),
'daily_data_processing',
'Business hours on weekday',
CURRENT_TIMESTAMP()
);
ELSE
-- 스킵 로그 기록
INSERT INTO `project.monitoring.scheduler_log` (
execution_date,
procedure_name,
execution_reason,
executed_at
) VALUES (
CURRENT_DATE(),
'daily_data_processing',
CONCAT('Skipped - Day: ', current_day, ', Hour: ', current_hour, ', Holiday: ', is_holiday),
CURRENT_TIMESTAMP()
);
END IF;
END;
2.7 스케줄 템플릿 활용
-- 재사용 가능한 스케줄 템플릿
CREATE OR REPLACE TABLE `project.config.schedule_templates` (
template_name STRING,
schedule_expression STRING,
timezone STRING,
description STRING,
use_case STRING
);
INSERT INTO `project.config.schedule_templates` VALUES
('daily_early_morning', 'every day 02:00', 'Asia/Seoul', '매일 새벽 2시 실행', 'ETL, 데이터 정제'),
('daily_business_hours', 'every day 09:00', 'Asia/Seoul', '매일 오전 9시 실행', '비즈니스 리포트'),
('hourly_business', 'every 1 hours', 'Asia/Seoul', '매시간 실행 (24시간)', '실시간 모니터링'),
('weekday_evening', '0 18 * * 1-5', 'Asia/Seoul', '평일 오후 6시 실행', '주간 분석'),
('weekly_monday', 'every monday 08:00', 'Asia/Seoul', '매주 월요일 오전 8시', '주간 리포트'),
('monthly_first', '1 of month 06:00', 'Asia/Seoul', '매월 1일 오전 6시', '월간 집계'),
('quarterly', '0 6 1 */3 *', 'Asia/Seoul', '분기별 첫날 오전 6시', '분기별 분석');
-- 템플릿을 사용한 스케줄 설정 함수
CREATE OR REPLACE PROCEDURE `project.utils.create_scheduled_query_from_template`(
template_name STRING,
query_name STRING,
query_sql STRING,
target_dataset STRING,
target_table STRING
)
BEGIN
DECLARE schedule_expr STRING;
DECLARE tz STRING;
-- 템플릿 정보 조회
SELECT schedule_expression, timezone
INTO schedule_expr, tz
FROM `project.config.schedule_templates`
WHERE template_name = template_name;
-- 실제 스케줄된 쿼리 생성을 위한 정보 출력
SELECT
CONCAT('bq mk --transfer_config --display_name="', query_name,
'" --data_source=scheduled_query --schedule="', schedule_expr,
'" --time_zone="', tz, '" --target_dataset=', target_dataset,
' --params=\'{"query":"', REPLACE(query_sql, '"', '\\"'),
'","destination_table_name_template":"', target_table,
'","write_disposition":"WRITE_TRUNCATE"}\'') as bq_command;
END;
3. 데이터 파이프라인 구축
3.1 기본 ETL 파이프라인
-- 일일 데이터 집계 파이프라인
-- Schedule: "every day 02:00"
-- 1단계: 원시 데이터 정제
CREATE OR REPLACE TABLE `project.staging.cleaned_orders` AS
SELECT
order_id,
customer_id,
PARSE_DATETIME('%Y-%m-%d %H:%M:%S', order_timestamp_str) as order_timestamp,
SAFE_CAST(amount_str AS FLOAT64) as amount,
UPPER(TRIM(status)) as status,
CURRENT_DATETIME() as processed_at
FROM `project.raw.orders`
WHERE DATE(PARSE_DATETIME('%Y-%m-%d %H:%M:%S', order_timestamp_str)) =
DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
AND SAFE_CAST(amount_str AS FLOAT64) > 0;
-- 2단계: 비즈니스 로직 적용
CREATE OR REPLACE TABLE `project.marts.daily_sales_summary` AS
WITH order_metrics AS (
SELECT
DATE(order_timestamp) as order_date,
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MIN(amount) as min_amount,
MAX(amount) as max_amount
FROM `project.staging.cleaned_orders`
GROUP BY order_date, customer_id
),
customer_segments AS (
SELECT
customer_id,
CASE
WHEN total_amount >= 1000 THEN 'VIP'
WHEN total_amount >= 500 THEN 'Premium'
WHEN total_amount >= 200 THEN 'Regular'
ELSE 'Basic'
END as customer_segment
FROM order_metrics
)
SELECT
om.order_date,
om.customer_id,
cs.customer_segment,
om.order_count,
om.total_amount,
om.avg_amount,
-- 고객 등급별 매출 기여도
om.total_amount / SUM(om.total_amount) OVER (PARTITION BY om.order_date) * 100 as contribution_pct,
CURRENT_DATETIME() as created_at
FROM order_metrics om
JOIN customer_segments cs ON om.customer_id = cs.customer_id;
-- 3단계: 품질 검증
INSERT INTO `project.monitoring.data_quality_checks` (
check_date,
table_name,
check_type,
expected_value,
actual_value,
status
)
SELECT
CURRENT_DATE() as check_date,
'daily_sales_summary' as table_name,
'row_count' as check_type,
NULL as expected_value,
COUNT(*) as actual_value,
CASE WHEN COUNT(*) > 0 THEN 'PASS' ELSE 'FAIL' END as status
FROM `project.marts.daily_sales_summary`
WHERE order_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);
3.2 다단계 의존성 파이프라인
-- 파이프라인 1: 기본 데이터 처리 (매일 01:00)
CREATE OR REPLACE PROCEDURE `project.etl.stage1_base_processing`()
BEGIN
-- 고객 데이터 업데이트
MERGE `project.master.customers` target
USING (
SELECT
customer_id,
email,
first_name,
last_name,
registration_date,
last_login_date
FROM `project.raw.customer_updates`
WHERE DATE(updated_at) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
) source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET
email = source.email,
first_name = source.first_name,
last_name = source.last_name,
last_login_date = source.last_login_date,
updated_at = CURRENT_DATETIME()
WHEN NOT MATCHED THEN INSERT (
customer_id, email, first_name, last_name,
registration_date, created_at
) VALUES (
source.customer_id, source.email, source.first_name,
source.last_name, source.registration_date, CURRENT_DATETIME()
);
-- 상태 기록
INSERT INTO `project.monitoring.pipeline_status` (
pipeline_name, stage, execution_date, status, completed_at
) VALUES (
'daily_etl', 'stage1_base_processing', CURRENT_DATE(),
'COMPLETED', CURRENT_DATETIME()
);
END;
-- 파이프라인 2: 집계 처리 (매일 02:00, Stage1 완료 후)
CREATE OR REPLACE PROCEDURE `project.etl.stage2_aggregation`()
BEGIN
DECLARE stage1_completed BOOL DEFAULT FALSE;
-- Stage1 완료 확인
SET stage1_completed = (
SELECT COUNT(*) > 0
FROM `project.monitoring.pipeline_status`
WHERE pipeline_name = 'daily_etl'
AND stage = 'stage1_base_processing'
AND execution_date = CURRENT_DATE()
AND status = 'COMPLETED'
);
IF NOT stage1_completed THEN
RAISE USING MESSAGE = 'Stage1 not completed. Cannot proceed with Stage2.';
END IF;
-- 집계 처리 실행
CREATE OR REPLACE TABLE `project.marts.customer_daily_summary` AS
SELECT
c.customer_id,
c.customer_segment,
COALESCE(o.order_count, 0) as daily_orders,
COALESCE(o.daily_revenue, 0) as daily_revenue,
c.lifetime_value,
CURRENT_DATE() as summary_date
FROM `project.master.customers` c
LEFT JOIN (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as daily_revenue
FROM `project.staging.cleaned_orders`
WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY customer_id
) o ON c.customer_id = o.customer_id;
-- Stage2 완료 기록
INSERT INTO `project.monitoring.pipeline_status` (
pipeline_name, stage, execution_date, status, completed_at
) VALUES (
'daily_etl', 'stage2_aggregation', CURRENT_DATE(),
'COMPLETED', CURRENT_DATETIME()
);
END;
4. 증분 처리 전략
4.1 날짜 기반 증분 처리
-- 증분 처리를 위한 스케줄된 쿼리
-- Schedule: "every day 03:00"
DECLARE process_date DATE DEFAULT DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);
-- 1. 이미 처리된 데이터 확인
IF EXISTS (
SELECT 1 FROM `project.processed.daily_metrics`
WHERE process_date = process_date
) THEN
-- 기존 데이터 삭제 (재처리)
DELETE FROM `project.processed.daily_metrics`
WHERE process_date = process_date;
END IF;
-- 2. 증분 데이터 처리
INSERT INTO `project.processed.daily_metrics` (
process_date,
customer_id,
total_orders,
total_revenue,
created_at
)
SELECT
process_date,
customer_id,
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
CURRENT_DATETIME() as created_at
FROM `project.raw.orders`
WHERE DATE(order_timestamp) = process_date
GROUP BY customer_id;
-- 3. 처리 로그 기록
INSERT INTO `project.monitoring.processing_log` (
process_date,
table_name,
records_processed,
processing_time_seconds,
status
)
SELECT
process_date,
'daily_metrics' as table_name,
COUNT(*) as records_processed,
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), SECOND) as processing_time_seconds,
'SUCCESS' as status
FROM `project.processed.daily_metrics`
WHERE process_date = process_date;
4.2 CDC (Change Data Capture) 패턴
-- 변경 데이터 캡처 기반 증분 처리
CREATE OR REPLACE PROCEDURE `project.etl.cdc_customer_processing`()
BEGIN
DECLARE last_processed_timestamp TIMESTAMP;
DECLARE current_batch_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP();
-- 마지막 처리 시점 조회
SET last_processed_timestamp = (
SELECT MAX(last_processed_timestamp)
FROM `project.monitoring.cdc_watermarks`
WHERE table_name = 'customers'
);
-- 기본값 설정 (최초 실행 시)
IF last_processed_timestamp IS NULL THEN
SET last_processed_timestamp = TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY);
END IF;
-- 변경된 레코드만 처리
MERGE `project.processed.customers` target
USING (
SELECT
customer_id,
email,
first_name,
last_name,
status,
updated_at,
_change_type -- INSERT, UPDATE, DELETE
FROM `project.raw.customers_cdc`
WHERE updated_at > last_processed_timestamp
AND updated_at <= current_batch_timestamp
) source
ON target.customer_id = source.customer_id
WHEN source._change_type = 'DELETE' THEN
DELETE
WHEN MATCHED AND source._change_type = 'UPDATE' THEN UPDATE SET
email = source.email,
first_name = source.first_name,
last_name = source.last_name,
status = source.status,
updated_at = source.updated_at
WHEN NOT MATCHED AND source._change_type = 'INSERT' THEN INSERT (
customer_id, email, first_name, last_name, status, created_at
) VALUES (
source.customer_id, source.email, source.first_name,
source.last_name, source.status, CURRENT_TIMESTAMP()
);
-- 워터마크 업데이트
MERGE `project.monitoring.cdc_watermarks` target
USING (SELECT 'customers' as table_name, current_batch_timestamp as last_processed_timestamp) source
ON target.table_name = source.table_name
WHEN MATCHED THEN UPDATE SET
last_processed_timestamp = source.last_processed_timestamp,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
table_name, last_processed_timestamp, created_at
) VALUES (
source.table_name, source.last_processed_timestamp, CURRENT_TIMESTAMP()
);
END;
4.3 배치 크기 최적화
-- 대용량 데이터 배치 처리
CREATE OR REPLACE PROCEDURE `project.etl.batch_processing`()
BEGIN
DECLARE batch_size INT64 DEFAULT 100000;
DECLARE offset_value INT64 DEFAULT 0;
DECLARE total_records INT64;
DECLARE processed_records INT64 DEFAULT 0;
-- 전체 처리할 레코드 수 확인
SET total_records = (
SELECT COUNT(*)
FROM `project.raw.large_dataset`
WHERE process_flag = 'PENDING'
);
-- 배치별 처리 루프
WHILE processed_records < total_records DO
-- 배치 처리
CREATE OR REPLACE TEMP TABLE current_batch AS
SELECT *
FROM `project.raw.large_dataset`
WHERE process_flag = 'PENDING'
ORDER BY created_at
LIMIT batch_size OFFSET offset_value;
-- 실제 처리 로직
INSERT INTO `project.processed.results` (
record_id,
processed_data,
processing_timestamp
)
SELECT
record_id,
UPPER(raw_data) as processed_data,
CURRENT_TIMESTAMP()
FROM current_batch;
-- 처리 완료 표시
UPDATE `project.raw.large_dataset`
SET process_flag = 'COMPLETED',
processed_at = CURRENT_TIMESTAMP()
WHERE record_id IN (SELECT record_id FROM current_batch);
-- 진행 상황 업데이트
SET processed_records = processed_records + batch_size;
SET offset_value = offset_value + batch_size;
-- 배치 간 잠시 대기 (리소스 부하 분산)
SELECT SLEEP(1); -- 1초 대기
-- 진행률 로깅
INSERT INTO `project.monitoring.batch_progress` (
batch_timestamp,
processed_records,
total_records,
progress_pct
) VALUES (
CURRENT_TIMESTAMP(),
processed_records,
total_records,
processed_records / total_records * 100
);
END WHILE;
END;
5. 오류 처리 및 재시도
5.1 기본 오류 처리
-- 오류 처리가 포함된 스케줄된 쿼리
CREATE OR REPLACE PROCEDURE `project.etl.robust_data_processing`()
BEGIN
DECLARE error_message STRING;
DECLARE execution_id STRING DEFAULT GENERATE_UUID();
-- 실행 시작 로깅
INSERT INTO `project.monitoring.execution_log` (
execution_id,
procedure_name,
status,
started_at
) VALUES (
execution_id,
'robust_data_processing',
'STARTED',
CURRENT_TIMESTAMP()
);
BEGIN
-- 메인 처리 로직
CREATE OR REPLACE TABLE `project.temp.processing_result` AS
SELECT
customer_id,
SUM(order_amount) as total_amount,
COUNT(*) as order_count
FROM `project.raw.orders`
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY customer_id;
-- 데이터 품질 검증
IF (SELECT COUNT(*) FROM `project.temp.processing_result`) = 0 THEN
RAISE USING MESSAGE = 'No data found for processing date';
END IF;
-- 결과 테이블에 삽입
INSERT INTO `project.processed.daily_customer_summary`
SELECT *, CURRENT_TIMESTAMP() as processed_at
FROM `project.temp.processing_result`;
-- 성공 로깅
UPDATE `project.monitoring.execution_log`
SET status = 'COMPLETED',
completed_at = CURRENT_TIMESTAMP(),
records_processed = (SELECT COUNT(*) FROM `project.temp.processing_result`)
WHERE execution_id = execution_id;
EXCEPTION WHEN ERROR THEN
-- 오류 정보 추출
SET error_message = @@error.message;
-- 오류 로깅
UPDATE `project.monitoring.execution_log`
SET status = 'FAILED',
error_message = error_message,
failed_at = CURRENT_TIMESTAMP()
WHERE execution_id = execution_id;
-- 알림 발송
INSERT INTO `project.notifications.error_alerts` (
alert_timestamp,
alert_type,
procedure_name,
error_message,
execution_id
) VALUES (
CURRENT_TIMESTAMP(),
'PROCESSING_ERROR',
'robust_data_processing',
error_message,
execution_id
);
-- 오류 재발생 (스케줄된 쿼리 실패로 표시)
RAISE USING MESSAGE = error_message;
END;
END;
5.2 지수 백오프 재시도
-- 재시도 로직이 포함된 프로시저
CREATE OR REPLACE PROCEDURE `project.etl.processing_with_retry`()
BEGIN
DECLARE retry_count INT64 DEFAULT 0;
DECLARE max_retries INT64 DEFAULT 3;
DECLARE base_delay_seconds INT64 DEFAULT 60;
DECLARE success BOOL DEFAULT FALSE;
DECLARE error_message STRING;
WHILE retry_count <= max_retries AND NOT success DO
BEGIN
-- 재시도 시작 로깅
INSERT INTO `project.monitoring.retry_log` (
retry_timestamp,
procedure_name,
retry_attempt,
status
) VALUES (
CURRENT_TIMESTAMP(),
'processing_with_retry',
retry_count,
'ATTEMPTING'
);
-- 메인 처리 로직
CALL `project.etl.main_processing_logic`();
-- 성공 시 플래그 설정
SET success = TRUE;
-- 성공 로깅
UPDATE `project.monitoring.retry_log`
SET status = 'SUCCESS'
WHERE procedure_name = 'processing_with_retry'
AND retry_attempt = retry_count
AND DATE(retry_timestamp) = CURRENT_DATE();
EXCEPTION WHEN ERROR THEN
SET error_message = @@error.message;
SET retry_count = retry_count + 1;
-- 실패 로깅
UPDATE `project.monitoring.retry_log`
SET status = 'FAILED',
error_message = error_message
WHERE procedure_name = 'processing_with_retry'
AND retry_attempt = retry_count - 1
AND DATE(retry_timestamp) = CURRENT_DATE();
-- 마지막 재시도가 아니면 대기
IF retry_count <= max_retries THEN
-- 지수 백오프: 60초, 120초, 240초 대기
SELECT SLEEP(base_delay_seconds * POWER(2, retry_count - 1));
END IF;
END;
END WHILE;
-- 최종 실패 처리
IF NOT success THEN
INSERT INTO `project.notifications.critical_alerts` (
alert_timestamp,
alert_type,
procedure_name,
final_error_message,
retry_attempts
) VALUES (
CURRENT_TIMESTAMP(),
'MAX_RETRIES_EXCEEDED',
'processing_with_retry',
error_message,
max_retries
);
RAISE USING MESSAGE = CONCAT('Max retries exceeded: ', error_message);
END IF;
END;
5.3 부분 실패 처리
-- 부분 실패를 허용하는 견고한 처리
CREATE OR REPLACE PROCEDURE `project.etl.fault_tolerant_processing`()
BEGIN
DECLARE total_batches INT64;
DECLARE successful_batches INT64 DEFAULT 0;
DECLARE failed_batches INT64 DEFAULT 0;
-- 배치 목록 생성
CREATE OR REPLACE TEMP TABLE batch_list AS
SELECT
customer_segment,
COUNT(*) as customer_count
FROM `project.raw.customers`
GROUP BY customer_segment;
SET total_batches = (SELECT COUNT(*) FROM batch_list);
-- 각 배치별 처리
FOR batch IN (SELECT customer_segment FROM batch_list) DO
BEGIN
-- 개별 배치 처리
INSERT INTO `project.processed.segment_summary` (
segment,
customer_count,
avg_order_value,
processing_date
)
SELECT
c.customer_segment,
COUNT(*) as customer_count,
AVG(o.order_amount) as avg_order_value,
CURRENT_DATE()
FROM `project.raw.customers` c
LEFT JOIN `project.raw.orders` o ON c.customer_id = o.customer_id
WHERE c.customer_segment = batch.customer_segment
AND DATE(o.order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY c.customer_segment;
SET successful_batches = successful_batches + 1;
-- 배치 성공 로깅
INSERT INTO `project.monitoring.batch_results` (
batch_date,
batch_id,
batch_status,
processed_at
) VALUES (
CURRENT_DATE(),
batch.customer_segment,
'SUCCESS',
CURRENT_TIMESTAMP()
);
EXCEPTION WHEN ERROR THEN
SET failed_batches = failed_batches + 1;
-- 배치 실패 로깅
INSERT INTO `project.monitoring.batch_results` (
batch_date,
batch_id,
batch_status,
error_message,
processed_at
) VALUES (
CURRENT_DATE(),
batch.customer_segment,
'FAILED',
@@error.message,
CURRENT_TIMESTAMP()
);
END;
END FOR;
-- 전체 처리 결과 평가
INSERT INTO `project.monitoring.job_summary` (
job_date,
job_name,
total_batches,
successful_batches,
failed_batches,
success_rate,
job_status
) VALUES (
CURRENT_DATE(),
'fault_tolerant_processing',
total_batches,
successful_batches,
failed_batches,
successful_batches / total_batches * 100,
CASE
WHEN failed_batches = 0 THEN 'FULL_SUCCESS'
WHEN successful_batches > 0 THEN 'PARTIAL_SUCCESS'
ELSE 'FULL_FAILURE'
END
);
-- 실패율이 50%를 초과하면 알림
IF failed_batches / total_batches > 0.5 THEN
INSERT INTO `project.notifications.job_alerts` (
alert_timestamp,
job_name,
alert_type,
message
) VALUES (
CURRENT_TIMESTAMP(),
'fault_tolerant_processing',
'HIGH_FAILURE_RATE',
CONCAT('Job completed with ', failed_batches, ' out of ', total_batches, ' batches failed')
);
END IF;
END;
6. 모니터링 및 알림
6.1 실행 상태 모니터링
-- 스케줄된 쿼리 실행 상태 모니터링 대시보드
CREATE OR REPLACE VIEW `project.monitoring.scheduled_query_dashboard` AS
WITH execution_summary AS (
SELECT
job_id,
user_email,
project_id,
job_type,
statement_type,
start_time,
end_time,
TIMESTAMP_DIFF(end_time, start_time, SECOND) as duration_seconds,
state,
error_result,
total_bytes_processed,
total_slot_ms,
creation_time
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
AND user_email LIKE '%scheduled-query%'
),
daily_stats AS (
SELECT
DATE(start_time) as execution_date,
COUNT(*) as total_executions,
COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END) as successful_executions,
COUNT(CASE WHEN state = 'DONE' AND error_result IS NOT NULL THEN 1 END) as failed_executions,
AVG(duration_seconds) as avg_duration_seconds,
MAX(duration_seconds) as max_duration_seconds,
SUM(total_bytes_processed) / 1024 / 1024 / 1024 as total_gb_processed,
SUM(total_slot_ms) / 1000 / 60 / 60 as total_slot_hours
FROM execution_summary
WHERE start_time IS NOT NULL
GROUP BY execution_date
)
SELECT
execution_date,
total_executions,
successful_executions,
failed_executions,
ROUND(successful_executions / total_executions * 100, 2) as success_rate_pct,
ROUND(avg_duration_seconds, 2) as avg_duration_seconds,
max_duration_seconds,
ROUND(total_gb_processed, 2) as total_gb_processed,
ROUND(total_slot_hours, 2) as total_slot_hours
FROM daily_stats
ORDER BY execution_date DESC;
6.2 성능 알림
-- 성능 이상 감지 및 알림
CREATE OR REPLACE PROCEDURE `project.monitoring.performance_alerts`()
BEGIN
DECLARE avg_duration FLOAT64;
DECLARE current_duration FLOAT64;
DECLARE threshold_multiplier FLOAT64 DEFAULT 2.0;
-- 지난 7일 평균 실행 시간 계산
SET avg_duration = (
SELECT AVG(TIMESTAMP_DIFF(end_time, start_time, SECOND))
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY)
AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
AND job_type = 'QUERY'
AND user_email LIKE '%scheduled-query%'
AND state = 'DONE'
AND error_result IS NULL
);
-- 오늘 실행 중 가장 느린 쿼리 시간
SET current_duration = (
SELECT MAX(TIMESTAMP_DIFF(end_time, start_time, SECOND))
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) = CURRENT_DATE()
AND job_type = 'QUERY'
AND user_email LIKE '%scheduled-query%'
AND state = 'DONE'
);
-- 성능 이상 감지
IF current_duration > avg_duration * threshold_multiplier THEN
INSERT INTO `project.notifications.performance_alerts` (
alert_timestamp,
alert_type,
current_duration_seconds,
average_duration_seconds,
threshold_exceeded_by,
recommended_action
) VALUES (
CURRENT_TIMESTAMP(),
'SLOW_EXECUTION',
current_duration,
avg_duration,
ROUND((current_duration / avg_duration - 1) * 100, 2),
'Review query performance and consider optimization'
);
END IF;
-- 슬롯 사용량 이상 감지
IF EXISTS (
SELECT 1
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) = CURRENT_DATE()
AND total_slot_ms > 3600000 -- 1시간 이상 슬롯 사용
AND job_type = 'QUERY'
) THEN
INSERT INTO `project.notifications.resource_alerts` (
alert_timestamp,
alert_type,
message
) VALUES (
CURRENT_TIMESTAMP(),
'HIGH_SLOT_USAGE',
'Scheduled query used excessive slot time today'
);
END IF;
END;
6.3 데이터 품질 알림
-- 데이터 품질 모니터링 및 알림
CREATE OR REPLACE PROCEDURE `project.monitoring.data_quality_alerts`()
BEGIN
-- 데이터 볼륨 이상 감지
DECLARE yesterday_count INT64;
DECLARE today_count INT64;
DECLARE volume_change_pct FLOAT64;
SET yesterday_count = (
SELECT COUNT(*)
FROM `project.processed.daily_summary`
WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
);
SET today_count = (
SELECT COUNT(*)
FROM `project.processed.daily_summary`
WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
);
SET volume_change_pct = (today_count - yesterday_count) / yesterday_count * 100;
-- 볼륨 변화가 30% 이상이면 알림
IF ABS(volume_change_pct) > 30 THEN
INSERT INTO `project.notifications.data_quality_alerts` (
alert_timestamp,
alert_type,
table_name,
metric_name,
expected_value,
actual_value,
deviation_pct
) VALUES (
CURRENT_TIMESTAMP(),
'VOLUME_ANOMALY',
'daily_summary',
'row_count',
yesterday_count,
today_count,
volume_change_pct
);
END IF;
-- NULL 값 비율 확인
FOR table_record IN (
SELECT
table_name,
column_name,
null_count,
total_count,
null_count / total_count * 100 as null_pct
FROM (
SELECT
'customer_summary' as table_name,
'customer_id' as column_name,
COUNTIF(customer_id IS NULL) as null_count,
COUNT(*) as total_count
FROM `project.processed.customer_summary`
WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
)
WHERE null_count / total_count > 0.05 -- 5% 이상 NULL
) DO
INSERT INTO `project.notifications.data_quality_alerts` (
alert_timestamp,
alert_type,
table_name,
column_name,
metric_name,
threshold_value,
actual_value
) VALUES (
CURRENT_TIMESTAMP(),
'HIGH_NULL_RATE',
table_record.table_name,
table_record.column_name,
'null_percentage',
5.0,
table_record.null_pct
);
END FOR;
-- 중복 데이터 감지
IF EXISTS (
SELECT customer_id
FROM `project.processed.daily_summary`
WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY customer_id
HAVING COUNT(*) > 1
) THEN
INSERT INTO `project.notifications.data_quality_alerts` (
alert_timestamp,
alert_type,
table_name,
message
) VALUES (
CURRENT_TIMESTAMP(),
'DUPLICATE_RECORDS',
'daily_summary',
'Duplicate customer records found in daily summary'
);
END IF;
END;
7. 성능 최적화
7.1 쿼리 최적화
-- 비효율적인 스케줄된 쿼리 (피해야 할 패턴)
/*
SELECT
customer_id,
(SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.customer_id) as order_count,
(SELECT SUM(amount) FROM orders o WHERE o.customer_id = c.customer_id) as total_spent
FROM customers c; -- N+1 문제 발생
*/
-- 최적화된 스케줄된 쿼리
CREATE OR REPLACE TABLE `project.marts.customer_metrics` AS
SELECT
c.customer_id,
c.customer_name,
c.registration_date,
COALESCE(o.order_count, 0) as order_count,
COALESCE(o.total_spent, 0) as total_spent,
COALESCE(o.avg_order_value, 0) as avg_order_value,
CURRENT_TIMESTAMP() as updated_at
FROM `project.master.customers` c
LEFT JOIN (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM `project.raw.orders`
WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY customer_id
) o ON c.customer_id = o.customer_id
WHERE c.status = 'ACTIVE';
7.2 파티션 활용 최적화
-- 파티션 테이블을 활용한 효율적인 처리
-- Schedule: "every day 01:00"
-- 파티션 프루닝을 위한 명시적 날짜 필터
DECLARE target_date DATE DEFAULT DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);
-- 이전 파티션 데이터 삭제 (재처리 시)
DELETE FROM `project.partitioned.daily_aggregates`
WHERE partition_date = target_date;
-- 효율적인 파티션 데이터 삽입
INSERT INTO `project.partitioned.daily_aggregates` (
partition_date,
customer_id,
order_count,
total_revenue,
created_at
)
SELECT
target_date as partition_date,
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
CURRENT_TIMESTAMP() as created_at
FROM `project.partitioned.orders`
WHERE DATE(_PARTITIONTIME) = target_date -- 파티션 필터
AND amount > 0
GROUP BY customer_id;
-- 파티션 메타데이터 업데이트
INSERT INTO `project.monitoring.partition_stats` (
table_name,
partition_date,
record_count,
size_gb,
created_at
)
SELECT
'daily_aggregates' as table_name,
target_date as partition_date,
COUNT(*) as record_count,
-- 실제 크기는 INFORMATION_SCHEMA에서 조회
0 as size_gb,
CURRENT_TIMESTAMP() as created_at
FROM `project.partitioned.daily_aggregates`
WHERE partition_date = target_date;
7.3 클러스터링 최적화
-- 클러스터링된 테이블로 성능 향상
CREATE OR REPLACE TABLE `project.optimized.customer_orders` (
customer_id STRING,
order_date DATE,
product_category STRING,
order_amount FLOAT64,
order_count INT64,
created_at TIMESTAMP
)
PARTITION BY order_date
CLUSTER BY customer_id, product_category;
-- 클러스터링을 활용한 효율적인 쿼리
-- Schedule: "every day 02:00"
INSERT INTO `project.optimized.customer_orders` (
customer_id,
order_date,
product_category,
order_amount,
order_count,
created_at
)
SELECT
o.customer_id,
DATE(o.order_timestamp) as order_date,
p.category as product_category,
SUM(o.amount) as order_amount,
COUNT(*) as order_count,
CURRENT_TIMESTAMP() as created_at
FROM `project.raw.orders` o
JOIN `project.master.products` p ON o.product_id = p.product_id
WHERE DATE(o.order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY
o.customer_id,
DATE(o.order_timestamp),
p.category;
-- 클러스터링 효과 확인
SELECT
table_name,
clustering_ordinal_position,
clustering_column_name
FROM `project.optimized.INFORMATION_SCHEMA.CLUSTERING_COLUMNS`
WHERE table_name = 'customer_orders';
8. 실제 활용 사례
8.1 실시간 대시보드 데이터 준비
-- 실시간 비즈니스 대시보드를 위한 데이터 파이프라인
-- Schedule: "every 15 minutes"
-- 1. 실시간 KPI 계산
CREATE OR REPLACE TABLE `project.dashboard.realtime_kpis` AS
WITH current_metrics AS (
SELECT
CURRENT_DATETIME() as snapshot_time,
-- 매출 메트릭
COUNT(DISTINCT order_id) as orders_last_15min,
SUM(order_amount) as revenue_last_15min,
COUNT(DISTINCT customer_id) as active_customers_last_15min,
-- 전환 메트릭
COUNT(DISTINCT CASE WHEN funnel_step = 'purchase' THEN session_id END) as conversions_last_15min,
COUNT(DISTINCT session_id) as total_sessions_last_15min
FROM `project.realtime.user_events`
WHERE event_timestamp >= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 15 MINUTE)
),
comparison_metrics AS (
SELECT
AVG(orders_last_15min) as avg_orders_15min,
AVG(revenue_last_15min) as avg_revenue_15min
FROM `project.dashboard.realtime_kpis`
WHERE DATE(snapshot_time) = CURRENT_DATE()
AND snapshot_time <= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 15 MINUTE)
)
SELECT
cm.*,
-- 전시간 대비 변화율
CASE
WHEN comp.avg_orders_15min > 0 THEN
(cm.orders_last_15min - comp.avg_orders_15min) / comp.avg_orders_15min * 100
ELSE 0
END as orders_change_pct,
CASE
WHEN comp.avg_revenue_15min > 0 THEN
(cm.revenue_last_15min - comp.avg_revenue_15min) / comp.avg_revenue_15min * 100
ELSE 0
END as revenue_change_pct,
-- 전환율 계산
CASE
WHEN cm.total_sessions_last_15min > 0 THEN
cm.conversions_last_15min / cm.total_sessions_last_15min * 100
ELSE 0
END as conversion_rate_pct
FROM current_metrics cm
CROSS JOIN comparison_metrics comp;
-- 2. 지역별 성과 분석
CREATE OR REPLACE TABLE `project.dashboard.regional_performance` AS
SELECT
region,
COUNT(DISTINCT order_id) as orders,
SUM(order_amount) as revenue,
COUNT(DISTINCT customer_id) as customers,
AVG(order_amount) as avg_order_value,
-- 전일 동시간대 대비
LAG(COUNT(DISTINCT order_id)) OVER (
PARTITION BY region
ORDER BY EXTRACT(HOUR FROM CURRENT_DATETIME())
) as orders_same_time_yesterday,
CURRENT_DATETIME() as updated_at
FROM `project.realtime.user_events`
WHERE event_timestamp >= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 1 HOUR)
AND funnel_step = 'purchase'
GROUP BY region;
8.2 고객 세그멘테이션 자동화
-- 일일 고객 세그멘테이션 업데이트
-- Schedule: "every day 04:00"
CREATE OR REPLACE PROCEDURE `project.ml.update_customer_segments`()
BEGIN
-- RFM 분석 기반 세그멘테이션
CREATE OR REPLACE TABLE `project.analytics.customer_rfm_scores` AS
WITH customer_metrics AS (
SELECT
customer_id,
DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) as recency_days,
COUNT(DISTINCT order_id) as frequency_orders,
SUM(order_amount) as monetary_total,
AVG(order_amount) as monetary_avg,
MIN(order_date) as first_order_date,
MAX(order_date) as last_order_date
FROM `project.raw.orders`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
GROUP BY customer_id
),
rfm_scores AS (
SELECT
customer_id,
recency_days,
frequency_orders,
monetary_total,
-- RFM 스코어 계산 (1-5점)
CASE
WHEN recency_days <= 30 THEN 5
WHEN recency_days <= 90 THEN 4
WHEN recency_days <= 180 THEN 3
WHEN recency_days <= 365 THEN 2
ELSE 1
END as recency_score,
NTILE(5) OVER (ORDER BY frequency_orders) as frequency_score,
NTILE(5) OVER (ORDER BY monetary_total) as monetary_score
FROM customer_metrics
)
SELECT
customer_id,
recency_days,
frequency_orders,
monetary_total,
recency_score,
frequency_score,
monetary_score,
-- 종합 세그먼트 분류
CASE
WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'Champions'
WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score >= 4 THEN 'Loyal Customers'
WHEN recency_score >= 4 AND frequency_score <= 2 THEN 'New Customers'
WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Potential Loyalists'
WHEN recency_score >= 3 AND frequency_score <= 2 AND monetary_score <= 2 THEN 'Promising'
WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'At Risk'
WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Cannot Lose Them'
WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'Hibernating'
ELSE 'Lost'
END as customer_segment,
CURRENT_DATE() as segment_date
FROM rfm_scores;
-- 세그먼트별 마케팅 액션 생성
CREATE OR REPLACE TABLE `project.marketing.segment_actions` AS
SELECT
customer_segment,
COUNT(*) as customer_count,
AVG(monetary_total) as avg_customer_value,
-- 세그먼트별 추천 액션
CASE customer_segment
WHEN 'Champions' THEN 'Reward loyalty, ask for referrals, offer new products'
WHEN 'Loyal Customers' THEN 'Upsell higher value products, offer loyalty programs'
WHEN 'New Customers' THEN 'Provide onboarding support, build relationship'
WHEN 'At Risk' THEN 'Send personalized offers, provide special customer support'
WHEN 'Cannot Lose Them' THEN 'Aggressive retention campaign, exclusive offers'
WHEN 'Lost' THEN 'Ignore or very low-cost re-engagement campaigns'
ELSE 'Standard marketing approach'
END as recommended_action,
-- 예상 마케팅 예산 (고객 가치 기반)
COUNT(*) * AVG(monetary_total) * 0.05 as suggested_marketing_budget,
CURRENT_DATE() as action_date
FROM `project.analytics.customer_rfm_scores`
GROUP BY customer_segment;
-- 세그멘트 변화 추적
INSERT INTO `project.analytics.segment_history` (
tracking_date,
customer_id,
previous_segment,
current_segment,
segment_change_type
)
SELECT
CURRENT_DATE(),
COALESCE(current.customer_id, previous.customer_id),
previous.customer_segment,
current.customer_segment,
CASE
WHEN previous.customer_segment IS NULL THEN 'NEW_CUSTOMER'
WHEN current.customer_segment IS NULL THEN 'CHURNED'
WHEN previous.customer_segment != current.customer_segment THEN 'SEGMENT_CHANGE'
ELSE 'NO_CHANGE'
END
FROM `project.analytics.customer_rfm_scores` current
FULL OUTER JOIN (
SELECT customer_id, customer_segment
FROM `project.analytics.customer_rfm_scores`
WHERE segment_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
) previous ON current.customer_id = previous.customer_id
WHERE previous.customer_segment != current.customer_segment
OR previous.customer_segment IS NULL
OR current.customer_segment IS NULL;
END;
8.3 재고 최적화 자동화
-- 자동 재고 보충 알고리즘
-- Schedule: "every day 06:00"
CREATE OR REPLACE PROCEDURE `project.supply_chain.automated_inventory_management`()
BEGIN
-- 1. 수요 예측 기반 재고 계산
CREATE OR REPLACE TABLE `project.supply_chain.inventory_recommendations` AS
WITH sales_velocity AS (
SELECT
product_id,
warehouse_id,
AVG(daily_sales) as avg_daily_sales,
STDDEV(daily_sales) as sales_stddev,
MAX(daily_sales) as max_daily_sales
FROM (
SELECT
product_id,
warehouse_id,
DATE(order_date) as sale_date,
SUM(quantity) as daily_sales
FROM `project.raw.orders`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY product_id, warehouse_id, DATE(order_date)
)
GROUP BY product_id, warehouse_id
HAVING COUNT(*) >= 30 -- 최소 30일 데이터 필요
),
demand_forecast AS (
SELECT
sv.product_id,
sv.warehouse_id,
sv.avg_daily_sales,
-- 7일 예상 수요 (95% 신뢰구간)
CEIL(sv.avg_daily_sales * 7 + 1.96 * sv.sales_stddev * SQRT(7)) as forecast_7days,
-- 안전재고 (2주치 + 변동성 고려)
CEIL(sv.avg_daily_sales * 14 + 2 * sv.sales_stddev * SQRT(14)) as safety_stock,
-- 최대 재고 (1개월 + 변동성)
CEIL(sv.avg_daily_sales * 30 + 1.96 * sv.sales_stddev * SQRT(30)) as max_stock
FROM sales_velocity sv
),
current_inventory AS (
SELECT
product_id,
warehouse_id,
current_stock,
reserved_stock,
available_stock,
last_updated
FROM `project.supply_chain.inventory_status`
WHERE DATE(last_updated) = CURRENT_DATE()
)
SELECT
df.product_id,
df.warehouse_id,
p.product_name,
p.unit_cost,
df.avg_daily_sales,
df.forecast_7days,
df.safety_stock,
df.max_stock,
COALESCE(ci.available_stock, 0) as current_available,
-- 재주문 필요량 계산
GREATEST(0, df.safety_stock - COALESCE(ci.available_stock, 0)) as reorder_quantity,
-- 재주문 우선순위 (매출 기여도 기반)
CASE
WHEN COALESCE(ci.available_stock, 0) = 0 THEN 'URGENT'
WHEN COALESCE(ci.available_stock, 0) <= df.avg_daily_sales * 3 THEN 'HIGH'
WHEN COALESCE(ci.available_stock, 0) <= df.safety_stock THEN 'MEDIUM'
WHEN COALESCE(ci.available_stock, 0) >= df.max_stock THEN 'OVERSTOCK'
ELSE 'NORMAL'
END as priority,
-- 예상 재고 소진일
CASE
WHEN df.avg_daily_sales > 0 THEN
COALESCE(ci.available_stock, 0) / df.avg_daily_sales
ELSE 999
END as days_until_stockout,
-- 주문 비용 추정
GREATEST(0, df.safety_stock - COALESCE(ci.available_stock, 0)) * p.unit_cost as order_cost,
CURRENT_DATE() as recommendation_date
FROM demand_forecast df
JOIN `project.master.products` p ON df.product_id = p.product_id
LEFT JOIN current_inventory ci ON df.product_id = ci.product_id
AND df.warehouse_id = ci.warehouse_id;
-- 2. 자동 주문 생성 (긴급 + 고우선순위)
INSERT INTO `project.supply_chain.purchase_orders` (
order_date,
product_id,
warehouse_id,
quantity_ordered,
unit_cost,
total_cost,
priority,
order_type,
expected_delivery_date,
status
)
SELECT
CURRENT_DATE(),
product_id,
warehouse_id,
reorder_quantity,
unit_cost,
order_cost,
priority,
'AUTO_REORDER' as order_type,
DATE_ADD(CURRENT_DATE(), INTERVAL 3 DAY) as expected_delivery_date,
'PENDING' as status
FROM `project.supply_chain.inventory_recommendations`
WHERE priority IN ('URGENT', 'HIGH')
AND reorder_quantity > 0;
-- 3. 알림 생성
INSERT INTO `project.notifications.inventory_alerts` (
alert_date,
alert_type,
product_count,
total_order_value,
urgent_items,
high_priority_items
)
SELECT
CURRENT_DATE(),
'AUTOMATED_REORDER',
COUNT(*),
SUM(order_cost),
COUNT(CASE WHEN priority = 'URGENT' THEN 1 END),
COUNT(CASE WHEN priority = 'HIGH' THEN 1 END)
FROM `project.supply_chain.inventory_recommendations`
WHERE reorder_quantity > 0;
END;
9. 모범 사례
9.1 스케줄 설계 원칙
시간대 및 타이밍 고려사항
-- 좋은 예: 명확한 시간대 설정과 비즈니스 로직 고려
-- 매일 새벽 2시 (데이터 소스 업데이트 완료 후)
-- Schedule: "every day 02:00"
-- Timezone: "Asia/Seoul"
-- 나쁜 예: 시간대 미설정으로 UTC 기준 실행
-- Schedule: "every day 02:00" (시간대 설정 없음)
의존성 관리
-- 스케줄된 쿼리 의존성 매핑 테이블
CREATE OR REPLACE TABLE `project.config.schedule_dependencies` (
parent_schedule STRING,
child_schedule STRING,
dependency_type STRING, -- 'HARD', 'SOFT'
max_wait_minutes INT64,
created_at TIMESTAMP
);
-- 의존성 체크 함수
CREATE OR REPLACE FUNCTION `project.utils.check_dependency_ready`(
parent_schedule STRING
) RETURNS BOOL
LANGUAGE SQL AS (
SELECT
CASE
WHEN MAX(end_time) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 MINUTE)
AND MAX(state) = 'DONE'
AND MAX(error_result) IS NULL
THEN TRUE
ELSE FALSE
END
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE job_id LIKE CONCAT('%', parent_schedule, '%')
AND DATE(creation_time) = CURRENT_DATE()
);
-- 의존성을 고려한 조건부 실행
CREATE OR REPLACE PROCEDURE `project.etl.dependent_processing`()
BEGIN
DECLARE dependencies_ready BOOL DEFAULT FALSE;
-- 상위 작업 완료 확인
SET dependencies_ready = `project.utils.check_dependency_ready`('parent_etl_job');
IF dependencies_ready THEN
-- 실제 처리 로직 실행
CALL `project.etl.main_processing_logic`();
ELSE
-- 의존성 미충족 알림
INSERT INTO `project.monitoring.dependency_alerts` (
alert_time,
job_name,
missing_dependency,
message
) VALUES (
CURRENT_TIMESTAMP(),
'dependent_processing',
'parent_etl_job',
'Parent job not completed within expected timeframe'
);
END IF;
END;
리소스 효율적 스케줄링
-- 리소스 사용량 분산을 위한 스케줄 설계
CREATE OR REPLACE TABLE `project.config.optimal_schedule_slots` (
time_slot STRING,
max_concurrent_jobs INT64,
current_jobs INT64,
resource_weight FLOAT64,
recommended_for STRING
);
INSERT INTO `project.config.optimal_schedule_slots` VALUES
('00:00-02:00', 5, 0, 1.0, '대용량 ETL, 데이터 마이그레이션'),
('02:00-06:00', 8, 0, 0.8, '일일 집계, 리포트 생성'),
('06:00-09:00', 3, 0, 0.6, '비즈니스 크리티컬 작업'),
('09:00-18:00', 2, 0, 0.3, '실시간 모니터링만'),
('18:00-00:00', 4, 0, 0.5, '주간/월간 분석 작업');
-- 최적 스케줄 시간 추천 함수
CREATE OR REPLACE PROCEDURE `project.utils.recommend_schedule_time`(
job_type STRING,
estimated_duration_minutes INT64
)
BEGIN
SELECT
time_slot,
max_concurrent_jobs - current_jobs as available_slots,
recommended_for
FROM `project.config.optimal_schedule_slots`
WHERE REGEXP_CONTAINS(recommended_for, job_type)
AND current_jobs < max_concurrent_jobs
ORDER BY resource_weight DESC, available_slots DESC
LIMIT 3;
END;
장애 복구 전략
-- 자동 복구 메커니즘
CREATE OR REPLACE PROCEDURE `project.recovery.auto_recovery_scheduler`()
BEGIN
DECLARE failed_jobs ARRAY<STRING>;
DECLARE job_name STRING;
-- 지난 4시간 내 실패한 크리티컬 작업 조회
SET failed_jobs = ARRAY(
SELECT job_id
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 HOUR)
AND state = 'DONE'
AND error_result IS NOT NULL
AND labels.key = 'critical'
AND labels.value = 'true'
);
-- 각 실패 작업에 대해 복구 시도
FOR job IN (SELECT job_id FROM UNNEST(failed_jobs) as job_id) DO
-- 복구 조건 확인 (최대 3회 시도)
IF (SELECT COUNT(*) FROM `project.monitoring.recovery_attempts`
WHERE original_job_id = job.job_id AND DATE(attempt_date) = CURRENT_DATE()) < 3 THEN
-- 복구 작업 실행
CALL `project.recovery.retry_failed_job`(job.job_id);
-- 복구 시도 로그
INSERT INTO `project.monitoring.recovery_attempts` (
original_job_id,
attempt_date,
recovery_method,
initiated_by
) VALUES (
job.job_id,
CURRENT_DATE(),
'auto_retry',
'system'
);
END IF;
END FOR;
END;
환경별 스케줄 관리
-- 환경별 스케줄 설정 관리
CREATE OR REPLACE TABLE `project.config.environment_schedules` (
schedule_name STRING,
dev_schedule STRING,
staging_schedule STRING,
prod_schedule STRING,
is_active_dev BOOL,
is_active_staging BOOL,
is_active_prod BOOL
);
INSERT INTO `project.config.environment_schedules` VALUES
('daily_etl', 'every 2 hours', 'every day 01:00', 'every day 02:00', true, true, true),
('weekly_report', 'every day 10:00', 'every monday 09:00', 'every monday 08:00', false, true, true),
('monthly_closing', 'every day 15:00', '1 of month 10:00', '1 of month 06:00', false, true, true);
-- 환경에 따른 동적 스케줄 설정
CREATE OR REPLACE FUNCTION `project.utils.get_schedule_for_environment`(
schedule_name STRING,
environment STRING
) RETURNS STRING
LANGUAGE SQL AS (
CASE environment
WHEN 'dev' THEN (SELECT dev_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_dev)
WHEN 'staging' THEN (SELECT staging_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_staging)
WHEN 'prod' THEN (SELECT prod_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_prod)
ELSE 'invalid_environment'
END
);
9.2 코드 구조화 및 모듈화
-- 1. 공통 함수들을 별도 데이터세트에 구성
-- `project.utils.date_functions`
CREATE OR REPLACE FUNCTION `project.utils.get_business_days_between`(
start_date DATE,
end_date DATE
) RETURNS INT64
LANGUAGE SQL AS (
(SELECT COUNT(*)
FROM UNNEST(GENERATE_DATE_ARRAY(start_date, end_date)) as d
WHERE EXTRACT(DAYOFWEEK FROM d) BETWEEN 2 AND 6) -- 월-금
);
-- 2. 설정 테이블 활용
CREATE OR REPLACE TABLE `project.config.etl_parameters` (
parameter_name STRING,
parameter_value STRING,
parameter_type STRING,
description STRING,
updated_at TIMESTAMP
);
INSERT INTO `project.config.etl_parameters` VALUES
('batch_size', '10000', 'INTEGER', 'Default batch processing size', CURRENT_TIMESTAMP()),
('retention_days', '365', 'INTEGER', 'Data retention period', CURRENT_TIMESTAMP()),
('alert_email', 'data-team@company.com', 'STRING', 'Alert notification email', CURRENT_TIMESTAMP());
-- 3. 파라미터화된 프로시저
CREATE OR REPLACE PROCEDURE `project.etl.configurable_processing`(
process_date DATE,
batch_size INT64
)
BEGIN
DECLARE actual_batch_size INT64;
-- 설정 테이블에서 파라미터 조회
SET actual_batch_size = COALESCE(
batch_size,
(SELECT CAST(parameter_value AS INT64)
FROM `project.config.etl_parameters`
WHERE parameter_name = 'batch_size')
);
-- 실제 처리 로직
CALL `project.etl.batch_process_orders`(process_date, actual_batch_size);
END;
9.3 테스트 및 검증
-- 데이터 파이프라인 테스트 프레임워크
CREATE OR REPLACE PROCEDURE `project.testing.run_etl_tests`()
BEGIN
DECLARE test_results ARRAY<STRUCT<test_name STRING, status STRING, message STRING>>;
-- 테스트 1: 데이터 볼륨 검증
IF (SELECT COUNT(*) FROM `project.processed.daily_summary`
WHERE summary_date = CURRENT_DATE()) = 0 THEN
SET test_results = ARRAY_CONCAT(test_results, [
STRUCT('volume_check' as test_name, 'FAIL' as status, 'No data found for today' as message)
]);
ELSE
SET test_results = ARRAY_CONCAT(test_results, [
STRUCT('volume_check' as test_name, 'PASS' as status, 'Data volume OK' as message)
]);
END IF;
-- 테스트 2: 데이터 품질 검증
IF EXISTS (
SELECT 1 FROM `project.processed.daily_summary`
WHERE summary_date = CURRENT_DATE()
AND (total_revenue < 0 OR customer_count < 0)
) THEN
SET test_results = ARRAY_CONCAT(test_results, [
STRUCT('quality_check' as test_name, 'FAIL' as status, 'Negative values found' as message)
]);
ELSE
SET test_results = ARRAY_CONCAT(test_results, [
STRUCT('quality_check' as test_name, 'PASS' as status, 'Data quality OK' as message)
]);
END IF;
-- 테스트 결과 저장
INSERT INTO `project.testing.test_results` (
test_date,
test_name,
status,
message,
executed_at
)
SELECT
CURRENT_DATE(),
result.test_name,
result.status,
result.message,
CURRENT_TIMESTAMP()
FROM UNNEST(test_results) as result;
-- 실패한 테스트가 있으면 알림
IF EXISTS (SELECT 1 FROM UNNEST(test_results) WHERE status = 'FAIL') THEN
INSERT INTO `project.notifications.test_alerts` (
alert_timestamp,
failed_tests,
test_summary
)
SELECT
CURRENT_TIMESTAMP(),
ARRAY(SELECT test_name FROM UNNEST(test_results) WHERE status = 'FAIL'),
STRING_AGG(CONCAT(test_name, ': ', message), '; ')
FROM UNNEST(test_results);
END IF;
END;
9.4 문서화 및 메타데이터 관리
-- 스케줄된 쿼리 메타데이터 관리
CREATE OR REPLACE TABLE `project.metadata.scheduled_queries` (
query_name STRING,
description STRING,
schedule_expression STRING,
owner_email STRING,
dependencies ARRAY<STRING>,
output_tables ARRAY<STRING>,
business_purpose STRING,
sla_hours INT64,
created_date DATE,
last_modified_date DATE,
is_active BOOL
);
INSERT INTO `project.metadata.scheduled_queries` VALUES
(
'daily_customer_summary',
'Daily aggregation of customer metrics including orders, revenue, and engagement',
'every day 02:00',
'data-team@company.com',
['raw.orders', 'raw.customers'],
['marts.customer_daily_summary'],
'Support customer analytics and segmentation for marketing team',
4, -- 4시간 SLA
'2024-01-01',
CURRENT_DATE(),
true
);
-- 스케줄된 쿼리 인벤토리 뷰
CREATE OR REPLACE VIEW `project.metadata.query_inventory` AS
SELECT
sq.query_name,
sq.description,
sq.schedule_expression,
sq.owner_email,
sq.sla_hours,
-- 최근 실행 정보 (INFORMATION_SCHEMA 조인)
recent.last_execution,
recent.last_status,
recent.avg_duration_minutes,
recent.total_executions_7days,
recent.success_rate_7days
FROM `project.metadata.scheduled_queries` sq
LEFT JOIN (
SELECT
'daily_customer_summary' as query_name, -- 실제로는 job_id와 매핑 필요
MAX(end_time) as last_execution,
MAX(state) as last_status,
AVG(TIMESTAMP_DIFF(end_time, start_time, SECOND)) / 60 as avg_duration_minutes,
COUNT(*) as total_executions_7days,
COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END)
/ COUNT(*) * 100 as success_rate_7days
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
GROUP BY query_name
) recent ON sq.query_name = recent.query_name
WHERE sq.is_active = true;
9.5 운영 가이드라인
-- 운영 체크리스트 자동화
CREATE OR REPLACE PROCEDURE `project.operations.daily_health_check`()
BEGIN
DECLARE health_summary STRING;
-- 1. 스케줄된 쿼리 실행 상태 확인
CREATE OR REPLACE TEMP TABLE daily_execution_status AS
SELECT
'scheduled_queries' as check_category,
COUNT(*) as total_jobs,
COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END) as successful_jobs,
COUNT(CASE WHEN state = 'RUNNING' THEN 1 END) as running_jobs,
COUNT(CASE WHEN state = 'DONE' AND error_result IS NOT NULL THEN 1 END) as failed_jobs
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) = CURRENT_DATE()
AND job_type = 'QUERY'
AND user_email LIKE '%scheduled-query%';
-- 2. 데이터 신선도 확인
INSERT INTO daily_execution_status
SELECT
'data_freshness' as check_category,
COUNT(*) as total_tables,
COUNT(CASE WHEN last_modified_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
THEN 1 END) as fresh_tables,
0 as running_jobs,
COUNT(CASE WHEN last_modified_time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 48 HOUR)
THEN 1 END) as stale_tables
FROM `project.marts.INFORMATION_SCHEMA.TABLES`
WHERE table_type = 'BASE TABLE';
-- 3. 리소스 사용량 확인
INSERT INTO daily_execution_status
SELECT
'resource_usage' as check_category,
COUNT(*) as total_queries,
COUNT(CASE WHEN total_slot_ms < 3600000 THEN 1 END) as normal_usage, -- < 1시간
0 as running_jobs,
COUNT(CASE WHEN total_slot_ms >= 3600000 THEN 1 END) as high_usage -- >= 1시간
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) = CURRENT_DATE();
-- 4. 종합 상태 리포트 생성
SET health_summary = (
SELECT STRING_AGG(
CONCAT(
check_category, ': ',
successful_jobs, '/', total_jobs, ' OK, ',
failed_jobs, ' failed'
),
' | '
)
FROM daily_execution_status
);
-- 5. 건강성 점수 계산
CREATE OR REPLACE TABLE `project.operations.daily_health_score` AS
SELECT
CURRENT_DATE() as check_date,
health_summary,
-- 종합 점수 (100점 만점)
ROUND(
(SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) + -- 성공률 50점
(COUNTIF(failed_jobs = 0) / COUNT(*) * 30) + -- 무결점 30점
(COUNTIF(running_jobs = 0) / COUNT(*) * 20) -- 지연없음 20점
) as health_score,
CASE
WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) +
(COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
(COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 90 THEN 'EXCELLENT'
WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) +
(COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
(COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 70 THEN 'GOOD'
WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) +
(COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
(COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 50 THEN 'FAIR'
ELSE 'POOR'
END as health_status,
CURRENT_TIMESTAMP() as generated_at
FROM daily_execution_status;
-- 6. 점수가 70점 미만이면 알림
IF (SELECT health_score FROM `project.operations.daily_health_score`
WHERE check_date = CURRENT_DATE()) < 70 THEN
INSERT INTO `project.notifications.operations_alerts` (
alert_timestamp,
alert_type,
health_score,
health_summary
)
SELECT
CURRENT_TIMESTAMP(),
'LOW_HEALTH_SCORE',
health_score,
health_summary
FROM `project.operations.daily_health_score`
WHERE check_date = CURRENT_DATE();
END IF;
END;
BigQuery 스케줄된 쿼리를 효과적으로 활용하면 복잡한 데이터 파이프라인을 자동화하고, 안정적인 데이터 운영 환경을 구축할 수 있습니다. 적절한 오류 처리, 모니터링, 테스트를 통해 신뢰할 수 있는 데이터 인프라를 만들어보세요.