스케줄된 쿼리

쿼리 자동화

BigQuery에서 스케줄된 쿼리를 활용한 자동화된 데이터 파이프라인 구축 방법을 다루는 종합 가이드입니다.


목차

  1. 스케줄된 쿼리 개요
  2. 스케줄링 설정
  3. 데이터 파이프라인 구축
  4. 증분 처리 전략
  5. 오류 처리 및 재시도
  6. 모니터링 및 알림
  7. 성능 최적화
  8. 실제 활용 사례
  9. 모범 사례

1. 스케줄된 쿼리 개요

1.1 스케줄된 쿼리란?

스케줄된 쿼리(Scheduled Query)는 지정된 시간에 자동으로 실행되는 BigQuery 쿼리입니다. ETL 파이프라인, 데이터 집계, 보고서 생성 등을 자동화할 수 있습니다.

1.2 주요 특징

  • 자동 실행: 정해진 스케줄에 따라 자동 실행
  • 결과 저장: 쿼리 결과를 테이블에 자동 저장
  • 재시도 메커니즘: 실패 시 자동 재시도
  • 알림 기능: 실행 결과 알림 발송
  • 버전 관리: 쿼리 변경 이력 추적

1.3 기본 워크플로우

# gcloud CLI를 사용한 스케줄된 쿼리 생성
bq mk \
  --transfer_config \
  --project_id=PROJECT_ID \
  --target_dataset=DATASET_ID \
  --display_name="Daily Sales Summary" \
  --data_source=scheduled_query \
  --schedule="every 24 hours" \
  --params='{
    "query": "SELECT DATE(order_timestamp) as date, SUM(amount) as total_sales FROM `project.dataset.orders` WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) GROUP BY date",
    "destination_table_name_template": "daily_sales_{run_date}",
    "write_disposition": "WRITE_TRUNCATE"
  }'

2. 스케줄링 설정

2.1 기본 스케줄 패턴

-- 매일 오전 6시 실행
-- Schedule: "every day 06:00"

-- 매주 월요일 오전 9시 실행  
-- Schedule: "every monday 09:00"

-- 매월 1일 오전 8시 실행
-- Schedule: "1 of month 08:00"

-- 매시간 실행
-- Schedule: "every 1 hours"

-- 30분마다 실행
-- Schedule: "every 30 minutes"

-- 특정 시간대 설정
-- Schedule: "every day 14:00"
-- Timezone: "Asia/Seoul"

2.2 Cron 표현식 사용

# 매일 오전 2시 30분 실행
# Cron: "30 2 * * *"

# 평일 오전 9시 실행 (월-금)
# Cron: "0 9 * * 1-5"

# 매달 15일 오후 6시 실행
# Cron: "0 18 15 * *"

# 매 15분마다 실행
# Cron: "*/15 * * * *"

# 매주 일요일 자정 실행
# Cron: "0 0 * * 0"

2.3 웹 UI를 통한 스케줄 설정

BigQuery 웹 콘솔에서 스케줄된 쿼리를 생성하는 단계별 방법:

단계 1: 쿼리 작성 및 스케줄 설정 시작

-- 예시: 일일 매출 집계 쿼리
SELECT 
  DATE(order_timestamp) as order_date,
  COUNT(DISTINCT order_id) as total_orders,
  SUM(order_amount) as total_revenue,
  COUNT(DISTINCT customer_id) as unique_customers,
  AVG(order_amount) as avg_order_value
FROM `project.raw.orders`
WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY order_date;

단계 2: 스케줄 옵션 설정

  1. 쿼리 실행 후 “스케줄된 쿼리 만들기” 클릭
  2. 기본 정보 입력:
    • 쿼리 이름: daily_sales_summary
    • 설명: Daily sales metrics aggregation
  3. 스케줄 설정:
    반복 유형: 매일
    시작 시간: 02:00 (오전 2시)
    시간대: Asia/Seoul
    시작 날짜: 오늘
    종료 날짜: 설정 안함 (무기한 실행)
    
  4. 고급 스케줄 옵션:
    재시도 설정: 최대 3회
    재시도 간격: 10분
    실행 시간 초과: 6시간
    

단계 3: 대상 설정

프로젝트 ID: my-project
데이터셋 ID: analytics
테이블 ID: daily_sales_summary
파티션: order_date 컬럼 기준 (선택사항)
쓰기 기본 설정: 매일 덮어쓰기 (WRITE_TRUNCATE)

단계 4: 알림 설정

이메일 알림: data-team@company.com
실패 시에만 알림: 체크
Pub/Sub 알림: projects/my-project/topics/bq-notifications (선택사항)

2.4 CLI를 통한 고급 스케줄 설정

# 복잡한 스케줄된 쿼리 생성 예시
bq mk \
  --transfer_config \
  --project_id=my-project \
  --target_dataset=analytics \
  --display_name="Advanced Daily ETL Pipeline" \
  --data_source=scheduled_query \
  --schedule="every day 02:00" \
  --time_zone="Asia/Seoul" \
  --notification_pubsub_topic="projects/my-project/topics/etl-notifications" \
  --params='{
    "query": "CALL `my-project.etl.daily_processing_pipeline`(@run_date)",
    "destination_table_name_template": "daily_summary_{run_date|'%Y%m%d'}",
    "write_disposition": "WRITE_TRUNCATE",
    "query_parameters": [
      {
        "name": "run_date",
        "parameterType": {"type": "DATE"},
        "parameterValue": {"value": "{run_date}"}
      }
    ]
  }'

# 파라미터가 있는 쿼리 스케줄링
bq mk \
  --transfer_config \
  --project_id=my-project \
  --target_dataset=reporting \
  --display_name="Weekly Report with Parameters" \
  --data_source=scheduled_query \
  --schedule="every monday 09:00" \
  --params='{
    "query": "SELECT customer_segment, SUM(revenue) as total_revenue FROM `my-project.analytics.customer_metrics` WHERE report_date BETWEEN @start_date AND @end_date GROUP BY customer_segment",
    "destination_table_name_template": "weekly_report_{run_time|'%Y%m%d'}",
    "query_parameters": [
      {
        "name": "start_date",
        "parameterType": {"type": "DATE"},
        "parameterValue": {"value": "{run_date-7}"}
      },
      {
        "name": "end_date", 
        "parameterType": {"type": "DATE"},
        "parameterValue": {"value": "{run_date-1}"}
      }
    ]
  }'

2.5 스케줄 수정 및 관리

기존 스케줄 수정

# 스케줄 시간 변경
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --schedule="every day 01:00"

# 쿼리 내용 수정
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --params='{
    "query": "SELECT DATE(created_at) as date, COUNT(*) as count FROM `project.dataset.new_table` WHERE DATE(created_at) = CURRENT_DATE() GROUP BY date"
  }'

# 대상 테이블 변경
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --target_dataset=new_dataset

# 알림 설정 변경
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --notification_pubsub_topic=projects/my-project/topics/new-notifications

스케줄 일시 정지 및 재개

# 스케줄 일시 정지
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --no_auto_scheduling

# 스케줄 재개
bq update transfer \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --auto_scheduling

# 즉시 실행 (스케줄과 별개)
bq mk transfer run \
  --transfer_config_id=projects/123/locations/us/transferConfigs/456 \
  --run_time="2024-01-15T10:00:00Z"

2.6 동적 스케줄 관리

-- 조건부 실행을 위한 스케줄 쿼리
CREATE OR REPLACE PROCEDURE `project.automation.conditional_scheduler`()
BEGIN
  DECLARE should_run BOOL DEFAULT FALSE;
  DECLARE current_hour INT64;
  DECLARE current_day STRING;
  DECLARE is_holiday BOOL DEFAULT FALSE;
  
  SET current_hour = EXTRACT(HOUR FROM CURRENT_DATETIME());
  SET current_day = FORMAT_DATETIME('%A', CURRENT_DATETIME());
  
  -- 휴일 확인
  SET is_holiday = (
    SELECT COUNT(*) > 0
    FROM `project.reference.holidays`
    WHERE holiday_date = CURRENT_DATE()
  );
  
  -- 비즈니스 로직에 따른 실행 조건
  IF current_day IN ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday') 
     AND current_hour BETWEEN 8 AND 18 
     AND NOT is_holiday THEN
    SET should_run = TRUE;
  END IF;
  
  -- 조건을 만족할 때만 실제 작업 실행
  IF should_run THEN
    CALL `project.etl.daily_data_processing`();
    
    -- 실행 로그 기록
    INSERT INTO `project.monitoring.scheduler_log` (
      execution_date,
      procedure_name,
      execution_reason,
      executed_at
    ) VALUES (
      CURRENT_DATE(),
      'daily_data_processing',
      'Business hours on weekday',
      CURRENT_TIMESTAMP()
    );
  ELSE
    -- 스킵 로그 기록
    INSERT INTO `project.monitoring.scheduler_log` (
      execution_date,
      procedure_name,
      execution_reason,
      executed_at
    ) VALUES (
      CURRENT_DATE(),
      'daily_data_processing',
      CONCAT('Skipped - Day: ', current_day, ', Hour: ', current_hour, ', Holiday: ', is_holiday),
      CURRENT_TIMESTAMP()
    );
  END IF;
END;

2.7 스케줄 템플릿 활용

-- 재사용 가능한 스케줄 템플릿
CREATE OR REPLACE TABLE `project.config.schedule_templates` (
  template_name STRING,
  schedule_expression STRING,
  timezone STRING,
  description STRING,
  use_case STRING
);

INSERT INTO `project.config.schedule_templates` VALUES
('daily_early_morning', 'every day 02:00', 'Asia/Seoul', '매일 새벽 2시 실행', 'ETL, 데이터 정제'),
('daily_business_hours', 'every day 09:00', 'Asia/Seoul', '매일 오전 9시 실행', '비즈니스 리포트'),
('hourly_business', 'every 1 hours', 'Asia/Seoul', '매시간 실행 (24시간)', '실시간 모니터링'),
('weekday_evening', '0 18 * * 1-5', 'Asia/Seoul', '평일 오후 6시 실행', '주간 분석'),
('weekly_monday', 'every monday 08:00', 'Asia/Seoul', '매주 월요일 오전 8시', '주간 리포트'),
('monthly_first', '1 of month 06:00', 'Asia/Seoul', '매월 1일 오전 6시', '월간 집계'),
('quarterly', '0 6 1 */3 *', 'Asia/Seoul', '분기별 첫날 오전 6시', '분기별 분석');

-- 템플릿을 사용한 스케줄 설정 함수
CREATE OR REPLACE PROCEDURE `project.utils.create_scheduled_query_from_template`(
  template_name STRING,
  query_name STRING,
  query_sql STRING,
  target_dataset STRING,
  target_table STRING
)
BEGIN
  DECLARE schedule_expr STRING;
  DECLARE tz STRING;
  
  -- 템플릿 정보 조회
  SELECT schedule_expression, timezone
  INTO schedule_expr, tz
  FROM `project.config.schedule_templates`
  WHERE template_name = template_name;
  
  -- 실제 스케줄된 쿼리 생성을 위한 정보 출력
  SELECT 
    CONCAT('bq mk --transfer_config --display_name="', query_name, 
           '" --data_source=scheduled_query --schedule="', schedule_expr,
           '" --time_zone="', tz, '" --target_dataset=', target_dataset,
           ' --params=\'{"query":"', REPLACE(query_sql, '"', '\\"'), 
           '","destination_table_name_template":"', target_table, 
           '","write_disposition":"WRITE_TRUNCATE"}\'') as bq_command;
END;

3. 데이터 파이프라인 구축

3.1 기본 ETL 파이프라인

-- 일일 데이터 집계 파이프라인
-- Schedule: "every day 02:00"

-- 1단계: 원시 데이터 정제
CREATE OR REPLACE TABLE `project.staging.cleaned_orders` AS
SELECT 
  order_id,
  customer_id,
  PARSE_DATETIME('%Y-%m-%d %H:%M:%S', order_timestamp_str) as order_timestamp,
  SAFE_CAST(amount_str AS FLOAT64) as amount,
  UPPER(TRIM(status)) as status,
  CURRENT_DATETIME() as processed_at
FROM `project.raw.orders`
WHERE DATE(PARSE_DATETIME('%Y-%m-%d %H:%M:%S', order_timestamp_str)) = 
      DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  AND SAFE_CAST(amount_str AS FLOAT64) > 0;

-- 2단계: 비즈니스 로직 적용
CREATE OR REPLACE TABLE `project.marts.daily_sales_summary` AS
WITH order_metrics AS (
  SELECT 
    DATE(order_timestamp) as order_date,
    customer_id,
    COUNT(*) as order_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MIN(amount) as min_amount,
    MAX(amount) as max_amount
  FROM `project.staging.cleaned_orders`
  GROUP BY order_date, customer_id
),
customer_segments AS (
  SELECT 
    customer_id,
    CASE 
      WHEN total_amount >= 1000 THEN 'VIP'
      WHEN total_amount >= 500 THEN 'Premium'
      WHEN total_amount >= 200 THEN 'Regular'
      ELSE 'Basic'
    END as customer_segment
  FROM order_metrics
)
SELECT 
  om.order_date,
  om.customer_id,
  cs.customer_segment,
  om.order_count,
  om.total_amount,
  om.avg_amount,
  -- 고객 등급별 매출 기여도
  om.total_amount / SUM(om.total_amount) OVER (PARTITION BY om.order_date) * 100 as contribution_pct,
  CURRENT_DATETIME() as created_at
FROM order_metrics om
JOIN customer_segments cs ON om.customer_id = cs.customer_id;

-- 3단계: 품질 검증
INSERT INTO `project.monitoring.data_quality_checks` (
  check_date,
  table_name,
  check_type,
  expected_value,
  actual_value,
  status
)
SELECT 
  CURRENT_DATE() as check_date,
  'daily_sales_summary' as table_name,
  'row_count' as check_type,
  NULL as expected_value,
  COUNT(*) as actual_value,
  CASE WHEN COUNT(*) > 0 THEN 'PASS' ELSE 'FAIL' END as status
FROM `project.marts.daily_sales_summary`
WHERE order_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);

3.2 다단계 의존성 파이프라인

-- 파이프라인 1: 기본 데이터 처리 (매일 01:00)
CREATE OR REPLACE PROCEDURE `project.etl.stage1_base_processing`()
BEGIN
  -- 고객 데이터 업데이트
  MERGE `project.master.customers` target
  USING (
    SELECT 
      customer_id,
      email,
      first_name,
      last_name,
      registration_date,
      last_login_date
    FROM `project.raw.customer_updates`
    WHERE DATE(updated_at) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  ) source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET
    email = source.email,
    first_name = source.first_name,
    last_name = source.last_name,
    last_login_date = source.last_login_date,
    updated_at = CURRENT_DATETIME()
  WHEN NOT MATCHED THEN INSERT (
    customer_id, email, first_name, last_name, 
    registration_date, created_at
  ) VALUES (
    source.customer_id, source.email, source.first_name, 
    source.last_name, source.registration_date, CURRENT_DATETIME()
  );
  
  -- 상태 기록
  INSERT INTO `project.monitoring.pipeline_status` (
    pipeline_name, stage, execution_date, status, completed_at
  ) VALUES (
    'daily_etl', 'stage1_base_processing', CURRENT_DATE(), 
    'COMPLETED', CURRENT_DATETIME()
  );
END;

-- 파이프라인 2: 집계 처리 (매일 02:00, Stage1 완료 후)
CREATE OR REPLACE PROCEDURE `project.etl.stage2_aggregation`()
BEGIN
  DECLARE stage1_completed BOOL DEFAULT FALSE;
  
  -- Stage1 완료 확인
  SET stage1_completed = (
    SELECT COUNT(*) > 0
    FROM `project.monitoring.pipeline_status`
    WHERE pipeline_name = 'daily_etl'
      AND stage = 'stage1_base_processing'
      AND execution_date = CURRENT_DATE()
      AND status = 'COMPLETED'
  );
  
  IF NOT stage1_completed THEN
    RAISE USING MESSAGE = 'Stage1 not completed. Cannot proceed with Stage2.';
  END IF;
  
  -- 집계 처리 실행
  CREATE OR REPLACE TABLE `project.marts.customer_daily_summary` AS
  SELECT 
    c.customer_id,
    c.customer_segment,
    COALESCE(o.order_count, 0) as daily_orders,
    COALESCE(o.daily_revenue, 0) as daily_revenue,
    c.lifetime_value,
    CURRENT_DATE() as summary_date
  FROM `project.master.customers` c
  LEFT JOIN (
    SELECT 
      customer_id,
      COUNT(*) as order_count,
      SUM(amount) as daily_revenue
    FROM `project.staging.cleaned_orders`
    WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
    GROUP BY customer_id
  ) o ON c.customer_id = o.customer_id;
  
  -- Stage2 완료 기록
  INSERT INTO `project.monitoring.pipeline_status` (
    pipeline_name, stage, execution_date, status, completed_at
  ) VALUES (
    'daily_etl', 'stage2_aggregation', CURRENT_DATE(), 
    'COMPLETED', CURRENT_DATETIME()
  );
END;

4. 증분 처리 전략

4.1 날짜 기반 증분 처리

-- 증분 처리를 위한 스케줄된 쿼리
-- Schedule: "every day 03:00"

DECLARE process_date DATE DEFAULT DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);

-- 1. 이미 처리된 데이터 확인
IF EXISTS (
  SELECT 1 FROM `project.processed.daily_metrics`
  WHERE process_date = process_date
) THEN
  -- 기존 데이터 삭제 (재처리)
  DELETE FROM `project.processed.daily_metrics`
  WHERE process_date = process_date;
END IF;

-- 2. 증분 데이터 처리
INSERT INTO `project.processed.daily_metrics` (
  process_date,
  customer_id,
  total_orders,
  total_revenue,
  created_at
)
SELECT 
  process_date,
  customer_id,
  COUNT(*) as total_orders,
  SUM(amount) as total_revenue,
  CURRENT_DATETIME() as created_at
FROM `project.raw.orders`
WHERE DATE(order_timestamp) = process_date
GROUP BY customer_id;

-- 3. 처리 로그 기록
INSERT INTO `project.monitoring.processing_log` (
  process_date,
  table_name,
  records_processed,
  processing_time_seconds,
  status
)
SELECT 
  process_date,
  'daily_metrics' as table_name,
  COUNT(*) as records_processed,
  TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), SECOND) as processing_time_seconds,
  'SUCCESS' as status
FROM `project.processed.daily_metrics`
WHERE process_date = process_date;

4.2 CDC (Change Data Capture) 패턴

-- 변경 데이터 캡처 기반 증분 처리
CREATE OR REPLACE PROCEDURE `project.etl.cdc_customer_processing`()
BEGIN
  DECLARE last_processed_timestamp TIMESTAMP;
  DECLARE current_batch_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP();
  
  -- 마지막 처리 시점 조회
  SET last_processed_timestamp = (
    SELECT MAX(last_processed_timestamp) 
    FROM `project.monitoring.cdc_watermarks`
    WHERE table_name = 'customers'
  );
  
  -- 기본값 설정 (최초 실행 시)
  IF last_processed_timestamp IS NULL THEN
    SET last_processed_timestamp = TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY);
  END IF;
  
  -- 변경된 레코드만 처리
  MERGE `project.processed.customers` target
  USING (
    SELECT 
      customer_id,
      email,
      first_name,
      last_name,
      status,
      updated_at,
      _change_type  -- INSERT, UPDATE, DELETE
    FROM `project.raw.customers_cdc`
    WHERE updated_at > last_processed_timestamp
      AND updated_at <= current_batch_timestamp
  ) source
  ON target.customer_id = source.customer_id
  WHEN source._change_type = 'DELETE' THEN
    DELETE
  WHEN MATCHED AND source._change_type = 'UPDATE' THEN UPDATE SET
    email = source.email,
    first_name = source.first_name,
    last_name = source.last_name,
    status = source.status,
    updated_at = source.updated_at
  WHEN NOT MATCHED AND source._change_type = 'INSERT' THEN INSERT (
    customer_id, email, first_name, last_name, status, created_at
  ) VALUES (
    source.customer_id, source.email, source.first_name, 
    source.last_name, source.status, CURRENT_TIMESTAMP()
  );
  
  -- 워터마크 업데이트
  MERGE `project.monitoring.cdc_watermarks` target
  USING (SELECT 'customers' as table_name, current_batch_timestamp as last_processed_timestamp) source
  ON target.table_name = source.table_name
  WHEN MATCHED THEN UPDATE SET
    last_processed_timestamp = source.last_processed_timestamp,
    updated_at = CURRENT_TIMESTAMP()
  WHEN NOT MATCHED THEN INSERT (
    table_name, last_processed_timestamp, created_at
  ) VALUES (
    source.table_name, source.last_processed_timestamp, CURRENT_TIMESTAMP()
  );
END;

4.3 배치 크기 최적화

-- 대용량 데이터 배치 처리
CREATE OR REPLACE PROCEDURE `project.etl.batch_processing`()
BEGIN
  DECLARE batch_size INT64 DEFAULT 100000;
  DECLARE offset_value INT64 DEFAULT 0;
  DECLARE total_records INT64;
  DECLARE processed_records INT64 DEFAULT 0;
  
  -- 전체 처리할 레코드 수 확인
  SET total_records = (
    SELECT COUNT(*)
    FROM `project.raw.large_dataset`
    WHERE process_flag = 'PENDING'
  );
  
  -- 배치별 처리 루프
  WHILE processed_records < total_records DO
    -- 배치 처리
    CREATE OR REPLACE TEMP TABLE current_batch AS
    SELECT *
    FROM `project.raw.large_dataset`
    WHERE process_flag = 'PENDING'
    ORDER BY created_at
    LIMIT batch_size OFFSET offset_value;
    
    -- 실제 처리 로직
    INSERT INTO `project.processed.results` (
      record_id,
      processed_data,
      processing_timestamp
    )
    SELECT 
      record_id,
      UPPER(raw_data) as processed_data,
      CURRENT_TIMESTAMP()
    FROM current_batch;
    
    -- 처리 완료 표시
    UPDATE `project.raw.large_dataset`
    SET process_flag = 'COMPLETED',
        processed_at = CURRENT_TIMESTAMP()
    WHERE record_id IN (SELECT record_id FROM current_batch);
    
    -- 진행 상황 업데이트
    SET processed_records = processed_records + batch_size;
    SET offset_value = offset_value + batch_size;
    
    -- 배치 간 잠시 대기 (리소스 부하 분산)
    SELECT SLEEP(1);  -- 1초 대기
    
    -- 진행률 로깅
    INSERT INTO `project.monitoring.batch_progress` (
      batch_timestamp,
      processed_records,
      total_records,
      progress_pct
    ) VALUES (
      CURRENT_TIMESTAMP(),
      processed_records,
      total_records,
      processed_records / total_records * 100
    );
  END WHILE;
END;

5. 오류 처리 및 재시도

5.1 기본 오류 처리

-- 오류 처리가 포함된 스케줄된 쿼리
CREATE OR REPLACE PROCEDURE `project.etl.robust_data_processing`()
BEGIN
  DECLARE error_message STRING;
  DECLARE execution_id STRING DEFAULT GENERATE_UUID();
  
  -- 실행 시작 로깅
  INSERT INTO `project.monitoring.execution_log` (
    execution_id,
    procedure_name,
    status,
    started_at
  ) VALUES (
    execution_id,
    'robust_data_processing',
    'STARTED',
    CURRENT_TIMESTAMP()
  );
  
  BEGIN
    -- 메인 처리 로직
    CREATE OR REPLACE TABLE `project.temp.processing_result` AS
    SELECT 
      customer_id,
      SUM(order_amount) as total_amount,
      COUNT(*) as order_count
    FROM `project.raw.orders`
    WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
    GROUP BY customer_id;
    
    -- 데이터 품질 검증
    IF (SELECT COUNT(*) FROM `project.temp.processing_result`) = 0 THEN
      RAISE USING MESSAGE = 'No data found for processing date';
    END IF;
    
    -- 결과 테이블에 삽입
    INSERT INTO `project.processed.daily_customer_summary`
    SELECT *, CURRENT_TIMESTAMP() as processed_at
    FROM `project.temp.processing_result`;
    
    -- 성공 로깅
    UPDATE `project.monitoring.execution_log`
    SET status = 'COMPLETED',
        completed_at = CURRENT_TIMESTAMP(),
        records_processed = (SELECT COUNT(*) FROM `project.temp.processing_result`)
    WHERE execution_id = execution_id;
    
  EXCEPTION WHEN ERROR THEN
    -- 오류 정보 추출
    SET error_message = @@error.message;
    
    -- 오류 로깅
    UPDATE `project.monitoring.execution_log`
    SET status = 'FAILED',
        error_message = error_message,
        failed_at = CURRENT_TIMESTAMP()
    WHERE execution_id = execution_id;
    
    -- 알림 발송
    INSERT INTO `project.notifications.error_alerts` (
      alert_timestamp,
      alert_type,
      procedure_name,
      error_message,
      execution_id
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'PROCESSING_ERROR',
      'robust_data_processing',
      error_message,
      execution_id
    );
    
    -- 오류 재발생 (스케줄된 쿼리 실패로 표시)
    RAISE USING MESSAGE = error_message;
  END;
END;

5.2 지수 백오프 재시도

-- 재시도 로직이 포함된 프로시저
CREATE OR REPLACE PROCEDURE `project.etl.processing_with_retry`()
BEGIN
  DECLARE retry_count INT64 DEFAULT 0;
  DECLARE max_retries INT64 DEFAULT 3;
  DECLARE base_delay_seconds INT64 DEFAULT 60;
  DECLARE success BOOL DEFAULT FALSE;
  DECLARE error_message STRING;
  
  WHILE retry_count <= max_retries AND NOT success DO
    BEGIN
      -- 재시도 시작 로깅
      INSERT INTO `project.monitoring.retry_log` (
        retry_timestamp,
        procedure_name,
        retry_attempt,
        status
      ) VALUES (
        CURRENT_TIMESTAMP(),
        'processing_with_retry',
        retry_count,
        'ATTEMPTING'
      );
      
      -- 메인 처리 로직
      CALL `project.etl.main_processing_logic`();
      
      -- 성공 시 플래그 설정
      SET success = TRUE;
      
      -- 성공 로깅
      UPDATE `project.monitoring.retry_log`
      SET status = 'SUCCESS'
      WHERE procedure_name = 'processing_with_retry'
        AND retry_attempt = retry_count
        AND DATE(retry_timestamp) = CURRENT_DATE();
        
    EXCEPTION WHEN ERROR THEN
      SET error_message = @@error.message;
      SET retry_count = retry_count + 1;
      
      -- 실패 로깅
      UPDATE `project.monitoring.retry_log`
      SET status = 'FAILED',
          error_message = error_message
      WHERE procedure_name = 'processing_with_retry'
        AND retry_attempt = retry_count - 1
        AND DATE(retry_timestamp) = CURRENT_DATE();
      
      -- 마지막 재시도가 아니면 대기
      IF retry_count <= max_retries THEN
        -- 지수 백오프: 60초, 120초, 240초 대기
        SELECT SLEEP(base_delay_seconds * POWER(2, retry_count - 1));
      END IF;
    END;
  END WHILE;
  
  -- 최종 실패 처리
  IF NOT success THEN
    INSERT INTO `project.notifications.critical_alerts` (
      alert_timestamp,
      alert_type,
      procedure_name,
      final_error_message,
      retry_attempts
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'MAX_RETRIES_EXCEEDED',
      'processing_with_retry',
      error_message,
      max_retries
    );
    
    RAISE USING MESSAGE = CONCAT('Max retries exceeded: ', error_message);
  END IF;
END;

5.3 부분 실패 처리

-- 부분 실패를 허용하는 견고한 처리
CREATE OR REPLACE PROCEDURE `project.etl.fault_tolerant_processing`()
BEGIN
  DECLARE total_batches INT64;
  DECLARE successful_batches INT64 DEFAULT 0;
  DECLARE failed_batches INT64 DEFAULT 0;
  
  -- 배치 목록 생성
  CREATE OR REPLACE TEMP TABLE batch_list AS
  SELECT 
    customer_segment,
    COUNT(*) as customer_count
  FROM `project.raw.customers`
  GROUP BY customer_segment;
  
  SET total_batches = (SELECT COUNT(*) FROM batch_list);
  
  -- 각 배치별 처리
  FOR batch IN (SELECT customer_segment FROM batch_list) DO
    BEGIN
      -- 개별 배치 처리
      INSERT INTO `project.processed.segment_summary` (
        segment,
        customer_count,
        avg_order_value,
        processing_date
      )
      SELECT 
        c.customer_segment,
        COUNT(*) as customer_count,
        AVG(o.order_amount) as avg_order_value,
        CURRENT_DATE()
      FROM `project.raw.customers` c
      LEFT JOIN `project.raw.orders` o ON c.customer_id = o.customer_id
      WHERE c.customer_segment = batch.customer_segment
        AND DATE(o.order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
      GROUP BY c.customer_segment;
      
      SET successful_batches = successful_batches + 1;
      
      -- 배치 성공 로깅
      INSERT INTO `project.monitoring.batch_results` (
        batch_date,
        batch_id,
        batch_status,
        processed_at
      ) VALUES (
        CURRENT_DATE(),
        batch.customer_segment,
        'SUCCESS',
        CURRENT_TIMESTAMP()
      );
      
    EXCEPTION WHEN ERROR THEN
      SET failed_batches = failed_batches + 1;
      
      -- 배치 실패 로깅
      INSERT INTO `project.monitoring.batch_results` (
        batch_date,
        batch_id,
        batch_status,
        error_message,
        processed_at
      ) VALUES (
        CURRENT_DATE(),
        batch.customer_segment,
        'FAILED',
        @@error.message,
        CURRENT_TIMESTAMP()
      );
    END;
  END FOR;
  
  -- 전체 처리 결과 평가
  INSERT INTO `project.monitoring.job_summary` (
    job_date,
    job_name,
    total_batches,
    successful_batches,
    failed_batches,
    success_rate,
    job_status
  ) VALUES (
    CURRENT_DATE(),
    'fault_tolerant_processing',
    total_batches,
    successful_batches,
    failed_batches,
    successful_batches / total_batches * 100,
    CASE 
      WHEN failed_batches = 0 THEN 'FULL_SUCCESS'
      WHEN successful_batches > 0 THEN 'PARTIAL_SUCCESS'
      ELSE 'FULL_FAILURE'
    END
  );
  
  -- 실패율이 50%를 초과하면 알림
  IF failed_batches / total_batches > 0.5 THEN
    INSERT INTO `project.notifications.job_alerts` (
      alert_timestamp,
      job_name,
      alert_type,
      message
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'fault_tolerant_processing',
      'HIGH_FAILURE_RATE',
      CONCAT('Job completed with ', failed_batches, ' out of ', total_batches, ' batches failed')
    );
  END IF;
END;

6. 모니터링 및 알림

6.1 실행 상태 모니터링

-- 스케줄된 쿼리 실행 상태 모니터링 대시보드
CREATE OR REPLACE VIEW `project.monitoring.scheduled_query_dashboard` AS
WITH execution_summary AS (
  SELECT 
    job_id,
    user_email,
    project_id,
    job_type,
    statement_type,
    start_time,
    end_time,
    TIMESTAMP_DIFF(end_time, start_time, SECOND) as duration_seconds,
    state,
    error_result,
    total_bytes_processed,
    total_slot_ms,
    creation_time
  FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
    AND job_type = 'QUERY'
    AND user_email LIKE '%scheduled-query%'
),
daily_stats AS (
  SELECT 
    DATE(start_time) as execution_date,
    COUNT(*) as total_executions,
    COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END) as successful_executions,
    COUNT(CASE WHEN state = 'DONE' AND error_result IS NOT NULL THEN 1 END) as failed_executions,
    AVG(duration_seconds) as avg_duration_seconds,
    MAX(duration_seconds) as max_duration_seconds,
    SUM(total_bytes_processed) / 1024 / 1024 / 1024 as total_gb_processed,
    SUM(total_slot_ms) / 1000 / 60 / 60 as total_slot_hours
  FROM execution_summary
  WHERE start_time IS NOT NULL
  GROUP BY execution_date
)
SELECT 
  execution_date,
  total_executions,
  successful_executions,
  failed_executions,
  ROUND(successful_executions / total_executions * 100, 2) as success_rate_pct,
  ROUND(avg_duration_seconds, 2) as avg_duration_seconds,
  max_duration_seconds,
  ROUND(total_gb_processed, 2) as total_gb_processed,
  ROUND(total_slot_hours, 2) as total_slot_hours
FROM daily_stats
ORDER BY execution_date DESC;

6.2 성능 알림

-- 성능 이상 감지 및 알림
CREATE OR REPLACE PROCEDURE `project.monitoring.performance_alerts`()
BEGIN
  DECLARE avg_duration FLOAT64;
  DECLARE current_duration FLOAT64;
  DECLARE threshold_multiplier FLOAT64 DEFAULT 2.0;
  
  -- 지난 7일 평균 실행 시간 계산
  SET avg_duration = (
    SELECT AVG(TIMESTAMP_DIFF(end_time, start_time, SECOND))
    FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
    WHERE DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY) 
                                  AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
      AND job_type = 'QUERY'
      AND user_email LIKE '%scheduled-query%'
      AND state = 'DONE'
      AND error_result IS NULL
  );
  
  -- 오늘 실행 중 가장 느린 쿼리 시간
  SET current_duration = (
    SELECT MAX(TIMESTAMP_DIFF(end_time, start_time, SECOND))
    FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
    WHERE DATE(creation_time) = CURRENT_DATE()
      AND job_type = 'QUERY'
      AND user_email LIKE '%scheduled-query%'
      AND state = 'DONE'
  );
  
  -- 성능 이상 감지
  IF current_duration > avg_duration * threshold_multiplier THEN
    INSERT INTO `project.notifications.performance_alerts` (
      alert_timestamp,
      alert_type,
      current_duration_seconds,
      average_duration_seconds,
      threshold_exceeded_by,
      recommended_action
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'SLOW_EXECUTION',
      current_duration,
      avg_duration,
      ROUND((current_duration / avg_duration - 1) * 100, 2),
      'Review query performance and consider optimization'
    );
  END IF;
  
  -- 슬롯 사용량 이상 감지
  IF EXISTS (
    SELECT 1
    FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
    WHERE DATE(creation_time) = CURRENT_DATE()
      AND total_slot_ms > 3600000  -- 1시간 이상 슬롯 사용
      AND job_type = 'QUERY'
  ) THEN
    INSERT INTO `project.notifications.resource_alerts` (
      alert_timestamp,
      alert_type,
      message
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'HIGH_SLOT_USAGE',
      'Scheduled query used excessive slot time today'
    );
  END IF;
END;

6.3 데이터 품질 알림

-- 데이터 품질 모니터링 및 알림
CREATE OR REPLACE PROCEDURE `project.monitoring.data_quality_alerts`()
BEGIN
  -- 데이터 볼륨 이상 감지
  DECLARE yesterday_count INT64;
  DECLARE today_count INT64;
  DECLARE volume_change_pct FLOAT64;
  
  SET yesterday_count = (
    SELECT COUNT(*)
    FROM `project.processed.daily_summary`
    WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
  );
  
  SET today_count = (
    SELECT COUNT(*)
    FROM `project.processed.daily_summary` 
    WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  );
  
  SET volume_change_pct = (today_count - yesterday_count) / yesterday_count * 100;
  
  -- 볼륨 변화가 30% 이상이면 알림
  IF ABS(volume_change_pct) > 30 THEN
    INSERT INTO `project.notifications.data_quality_alerts` (
      alert_timestamp,
      alert_type,
      table_name,
      metric_name,
      expected_value,
      actual_value,
      deviation_pct
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'VOLUME_ANOMALY',
      'daily_summary',
      'row_count',
      yesterday_count,
      today_count,
      volume_change_pct
    );
  END IF;
  
  -- NULL 값 비율 확인
  FOR table_record IN (
    SELECT 
      table_name,
      column_name,
      null_count,
      total_count,
      null_count / total_count * 100 as null_pct
    FROM (
      SELECT 
        'customer_summary' as table_name,
        'customer_id' as column_name,
        COUNTIF(customer_id IS NULL) as null_count,
        COUNT(*) as total_count
      FROM `project.processed.customer_summary`
      WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
    )
    WHERE null_count / total_count > 0.05  -- 5% 이상 NULL
  ) DO
    INSERT INTO `project.notifications.data_quality_alerts` (
      alert_timestamp,
      alert_type,
      table_name,
      column_name,
      metric_name,
      threshold_value,
      actual_value
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'HIGH_NULL_RATE',
      table_record.table_name,
      table_record.column_name,
      'null_percentage',
      5.0,
      table_record.null_pct
    );
  END FOR;
  
  -- 중복 데이터 감지
  IF EXISTS (
    SELECT customer_id
    FROM `project.processed.daily_summary`
    WHERE summary_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
    GROUP BY customer_id
    HAVING COUNT(*) > 1
  ) THEN
    INSERT INTO `project.notifications.data_quality_alerts` (
      alert_timestamp,
      alert_type,
      table_name,
      message
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'DUPLICATE_RECORDS',
      'daily_summary',
      'Duplicate customer records found in daily summary'
    );
  END IF;
END;

7. 성능 최적화

7.1 쿼리 최적화

-- 비효율적인 스케줄된 쿼리 (피해야 할 패턴)
/*
SELECT 
  customer_id,
  (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.customer_id) as order_count,
  (SELECT SUM(amount) FROM orders o WHERE o.customer_id = c.customer_id) as total_spent
FROM customers c;  -- N+1 문제 발생
*/

-- 최적화된 스케줄된 쿼리
CREATE OR REPLACE TABLE `project.marts.customer_metrics` AS
SELECT 
  c.customer_id,
  c.customer_name,
  c.registration_date,
  COALESCE(o.order_count, 0) as order_count,
  COALESCE(o.total_spent, 0) as total_spent,
  COALESCE(o.avg_order_value, 0) as avg_order_value,
  CURRENT_TIMESTAMP() as updated_at
FROM `project.master.customers` c
LEFT JOIN (
  SELECT 
    customer_id,
    COUNT(*) as order_count,
    SUM(amount) as total_spent,
    AVG(amount) as avg_order_value
  FROM `project.raw.orders`
  WHERE DATE(order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  GROUP BY customer_id
) o ON c.customer_id = o.customer_id
WHERE c.status = 'ACTIVE';

7.2 파티션 활용 최적화

-- 파티션 테이블을 활용한 효율적인 처리
-- Schedule: "every day 01:00"

-- 파티션 프루닝을 위한 명시적 날짜 필터
DECLARE target_date DATE DEFAULT DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY);

-- 이전 파티션 데이터 삭제 (재처리 시)
DELETE FROM `project.partitioned.daily_aggregates`
WHERE partition_date = target_date;

-- 효율적인 파티션 데이터 삽입
INSERT INTO `project.partitioned.daily_aggregates` (
  partition_date,
  customer_id,
  order_count,
  total_revenue,
  created_at
)
SELECT 
  target_date as partition_date,
  customer_id,
  COUNT(*) as order_count,
  SUM(amount) as total_revenue,
  CURRENT_TIMESTAMP() as created_at
FROM `project.partitioned.orders`
WHERE DATE(_PARTITIONTIME) = target_date  -- 파티션 필터
  AND amount > 0
GROUP BY customer_id;

-- 파티션 메타데이터 업데이트
INSERT INTO `project.monitoring.partition_stats` (
  table_name,
  partition_date,
  record_count,
  size_gb,
  created_at
)
SELECT 
  'daily_aggregates' as table_name,
  target_date as partition_date,
  COUNT(*) as record_count,
  -- 실제 크기는 INFORMATION_SCHEMA에서 조회
  0 as size_gb,
  CURRENT_TIMESTAMP() as created_at
FROM `project.partitioned.daily_aggregates`
WHERE partition_date = target_date;

7.3 클러스터링 최적화

-- 클러스터링된 테이블로 성능 향상
CREATE OR REPLACE TABLE `project.optimized.customer_orders` (
  customer_id STRING,
  order_date DATE,
  product_category STRING,
  order_amount FLOAT64,
  order_count INT64,
  created_at TIMESTAMP
)
PARTITION BY order_date
CLUSTER BY customer_id, product_category;

-- 클러스터링을 활용한 효율적인 쿼리
-- Schedule: "every day 02:00"

INSERT INTO `project.optimized.customer_orders` (
  customer_id,
  order_date,
  product_category,
  order_amount,
  order_count,
  created_at
)
SELECT 
  o.customer_id,
  DATE(o.order_timestamp) as order_date,
  p.category as product_category,
  SUM(o.amount) as order_amount,
  COUNT(*) as order_count,
  CURRENT_TIMESTAMP() as created_at
FROM `project.raw.orders` o
JOIN `project.master.products` p ON o.product_id = p.product_id
WHERE DATE(o.order_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY 
  o.customer_id,
  DATE(o.order_timestamp),
  p.category;

-- 클러스터링 효과 확인
SELECT 
  table_name,
  clustering_ordinal_position,
  clustering_column_name
FROM `project.optimized.INFORMATION_SCHEMA.CLUSTERING_COLUMNS`
WHERE table_name = 'customer_orders';

8. 실제 활용 사례

8.1 실시간 대시보드 데이터 준비

-- 실시간 비즈니스 대시보드를 위한 데이터 파이프라인
-- Schedule: "every 15 minutes"

-- 1. 실시간 KPI 계산
CREATE OR REPLACE TABLE `project.dashboard.realtime_kpis` AS
WITH current_metrics AS (
  SELECT 
    CURRENT_DATETIME() as snapshot_time,
    
    -- 매출 메트릭
    COUNT(DISTINCT order_id) as orders_last_15min,
    SUM(order_amount) as revenue_last_15min,
    COUNT(DISTINCT customer_id) as active_customers_last_15min,
    
    -- 전환 메트릭
    COUNT(DISTINCT CASE WHEN funnel_step = 'purchase' THEN session_id END) as conversions_last_15min,
    COUNT(DISTINCT session_id) as total_sessions_last_15min
    
  FROM `project.realtime.user_events`
  WHERE event_timestamp >= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 15 MINUTE)
),
comparison_metrics AS (
  SELECT 
    AVG(orders_last_15min) as avg_orders_15min,
    AVG(revenue_last_15min) as avg_revenue_15min
  FROM `project.dashboard.realtime_kpis`
  WHERE DATE(snapshot_time) = CURRENT_DATE()
    AND snapshot_time <= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 15 MINUTE)
)
SELECT 
  cm.*,
  
  -- 전시간 대비 변화율
  CASE 
    WHEN comp.avg_orders_15min > 0 THEN 
      (cm.orders_last_15min - comp.avg_orders_15min) / comp.avg_orders_15min * 100
    ELSE 0 
  END as orders_change_pct,
  
  CASE 
    WHEN comp.avg_revenue_15min > 0 THEN 
      (cm.revenue_last_15min - comp.avg_revenue_15min) / comp.avg_revenue_15min * 100
    ELSE 0 
  END as revenue_change_pct,
  
  -- 전환율 계산
  CASE 
    WHEN cm.total_sessions_last_15min > 0 THEN 
      cm.conversions_last_15min / cm.total_sessions_last_15min * 100
    ELSE 0 
  END as conversion_rate_pct
  
FROM current_metrics cm
CROSS JOIN comparison_metrics comp;

-- 2. 지역별 성과 분석
CREATE OR REPLACE TABLE `project.dashboard.regional_performance` AS
SELECT 
  region,
  COUNT(DISTINCT order_id) as orders,
  SUM(order_amount) as revenue,
  COUNT(DISTINCT customer_id) as customers,
  AVG(order_amount) as avg_order_value,
  
  -- 전일 동시간대 대비
  LAG(COUNT(DISTINCT order_id)) OVER (
    PARTITION BY region 
    ORDER BY EXTRACT(HOUR FROM CURRENT_DATETIME())
  ) as orders_same_time_yesterday,
  
  CURRENT_DATETIME() as updated_at
FROM `project.realtime.user_events`
WHERE event_timestamp >= DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 1 HOUR)
  AND funnel_step = 'purchase'
GROUP BY region;

8.2 고객 세그멘테이션 자동화

-- 일일 고객 세그멘테이션 업데이트
-- Schedule: "every day 04:00"

CREATE OR REPLACE PROCEDURE `project.ml.update_customer_segments`()
BEGIN
  -- RFM 분석 기반 세그멘테이션
  CREATE OR REPLACE TABLE `project.analytics.customer_rfm_scores` AS
  WITH customer_metrics AS (
    SELECT 
      customer_id,
      DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) as recency_days,
      COUNT(DISTINCT order_id) as frequency_orders,
      SUM(order_amount) as monetary_total,
      AVG(order_amount) as monetary_avg,
      MIN(order_date) as first_order_date,
      MAX(order_date) as last_order_date
    FROM `project.raw.orders`
    WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
    GROUP BY customer_id
  ),
  rfm_scores AS (
    SELECT 
      customer_id,
      recency_days,
      frequency_orders,
      monetary_total,
      
      -- RFM 스코어 계산 (1-5점)
      CASE 
        WHEN recency_days <= 30 THEN 5
        WHEN recency_days <= 90 THEN 4
        WHEN recency_days <= 180 THEN 3
        WHEN recency_days <= 365 THEN 2
        ELSE 1
      END as recency_score,
      
      NTILE(5) OVER (ORDER BY frequency_orders) as frequency_score,
      NTILE(5) OVER (ORDER BY monetary_total) as monetary_score
    FROM customer_metrics
  )
  SELECT 
    customer_id,
    recency_days,
    frequency_orders,
    monetary_total,
    recency_score,
    frequency_score,
    monetary_score,
    
    -- 종합 세그먼트 분류
    CASE 
      WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'Champions'
      WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score >= 4 THEN 'Loyal Customers'
      WHEN recency_score >= 4 AND frequency_score <= 2 THEN 'New Customers'
      WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Potential Loyalists'
      WHEN recency_score >= 3 AND frequency_score <= 2 AND monetary_score <= 2 THEN 'Promising'
      WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'At Risk'
      WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Cannot Lose Them'
      WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'Hibernating'
      ELSE 'Lost'
    END as customer_segment,
    
    CURRENT_DATE() as segment_date
  FROM rfm_scores;
  
  -- 세그먼트별 마케팅 액션 생성
  CREATE OR REPLACE TABLE `project.marketing.segment_actions` AS
  SELECT 
    customer_segment,
    COUNT(*) as customer_count,
    AVG(monetary_total) as avg_customer_value,
    
    -- 세그먼트별 추천 액션
    CASE customer_segment
      WHEN 'Champions' THEN 'Reward loyalty, ask for referrals, offer new products'
      WHEN 'Loyal Customers' THEN 'Upsell higher value products, offer loyalty programs'
      WHEN 'New Customers' THEN 'Provide onboarding support, build relationship'
      WHEN 'At Risk' THEN 'Send personalized offers, provide special customer support'
      WHEN 'Cannot Lose Them' THEN 'Aggressive retention campaign, exclusive offers'
      WHEN 'Lost' THEN 'Ignore or very low-cost re-engagement campaigns'
      ELSE 'Standard marketing approach'
    END as recommended_action,
    
    -- 예상 마케팅 예산 (고객 가치 기반)
    COUNT(*) * AVG(monetary_total) * 0.05 as suggested_marketing_budget,
    
    CURRENT_DATE() as action_date
  FROM `project.analytics.customer_rfm_scores`
  GROUP BY customer_segment;
  
  -- 세그멘트 변화 추적
  INSERT INTO `project.analytics.segment_history` (
    tracking_date,
    customer_id,
    previous_segment,
    current_segment,
    segment_change_type
  )
  SELECT 
    CURRENT_DATE(),
    COALESCE(current.customer_id, previous.customer_id),
    previous.customer_segment,
    current.customer_segment,
    CASE 
      WHEN previous.customer_segment IS NULL THEN 'NEW_CUSTOMER'
      WHEN current.customer_segment IS NULL THEN 'CHURNED'
      WHEN previous.customer_segment != current.customer_segment THEN 'SEGMENT_CHANGE'
      ELSE 'NO_CHANGE'
    END
  FROM `project.analytics.customer_rfm_scores` current
  FULL OUTER JOIN (
    SELECT customer_id, customer_segment
    FROM `project.analytics.customer_rfm_scores`
    WHERE segment_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  ) previous ON current.customer_id = previous.customer_id
  WHERE previous.customer_segment != current.customer_segment
     OR previous.customer_segment IS NULL
     OR current.customer_segment IS NULL;
     
END;

8.3 재고 최적화 자동화

-- 자동 재고 보충 알고리즘
-- Schedule: "every day 06:00"

CREATE OR REPLACE PROCEDURE `project.supply_chain.automated_inventory_management`()
BEGIN
  -- 1. 수요 예측 기반 재고 계산
  CREATE OR REPLACE TABLE `project.supply_chain.inventory_recommendations` AS
  WITH sales_velocity AS (
    SELECT 
      product_id,
      warehouse_id,
      AVG(daily_sales) as avg_daily_sales,
      STDDEV(daily_sales) as sales_stddev,
      MAX(daily_sales) as max_daily_sales
    FROM (
      SELECT 
        product_id,
        warehouse_id,
        DATE(order_date) as sale_date,
        SUM(quantity) as daily_sales
      FROM `project.raw.orders`
      WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
      GROUP BY product_id, warehouse_id, DATE(order_date)
    )
    GROUP BY product_id, warehouse_id
    HAVING COUNT(*) >= 30  -- 최소 30일 데이터 필요
  ),
  demand_forecast AS (
    SELECT 
      sv.product_id,
      sv.warehouse_id,
      sv.avg_daily_sales,
      
      -- 7일 예상 수요 (95% 신뢰구간)
      CEIL(sv.avg_daily_sales * 7 + 1.96 * sv.sales_stddev * SQRT(7)) as forecast_7days,
      
      -- 안전재고 (2주치 + 변동성 고려)
      CEIL(sv.avg_daily_sales * 14 + 2 * sv.sales_stddev * SQRT(14)) as safety_stock,
      
      -- 최대 재고 (1개월 + 변동성)
      CEIL(sv.avg_daily_sales * 30 + 1.96 * sv.sales_stddev * SQRT(30)) as max_stock
      
    FROM sales_velocity sv
  ),
  current_inventory AS (
    SELECT 
      product_id,
      warehouse_id,
      current_stock,
      reserved_stock,
      available_stock,
      last_updated
    FROM `project.supply_chain.inventory_status`
    WHERE DATE(last_updated) = CURRENT_DATE()
  )
  SELECT 
    df.product_id,
    df.warehouse_id,
    p.product_name,
    p.unit_cost,
    df.avg_daily_sales,
    df.forecast_7days,
    df.safety_stock,
    df.max_stock,
    COALESCE(ci.available_stock, 0) as current_available,
    
    -- 재주문 필요량 계산
    GREATEST(0, df.safety_stock - COALESCE(ci.available_stock, 0)) as reorder_quantity,
    
    -- 재주문 우선순위 (매출 기여도 기반)
    CASE 
      WHEN COALESCE(ci.available_stock, 0) = 0 THEN 'URGENT'
      WHEN COALESCE(ci.available_stock, 0) <= df.avg_daily_sales * 3 THEN 'HIGH'
      WHEN COALESCE(ci.available_stock, 0) <= df.safety_stock THEN 'MEDIUM'
      WHEN COALESCE(ci.available_stock, 0) >= df.max_stock THEN 'OVERSTOCK'
      ELSE 'NORMAL'
    END as priority,
    
    -- 예상 재고 소진일
    CASE 
      WHEN df.avg_daily_sales > 0 THEN 
        COALESCE(ci.available_stock, 0) / df.avg_daily_sales
      ELSE 999
    END as days_until_stockout,
    
    -- 주문 비용 추정
    GREATEST(0, df.safety_stock - COALESCE(ci.available_stock, 0)) * p.unit_cost as order_cost,
    
    CURRENT_DATE() as recommendation_date
    
  FROM demand_forecast df
  JOIN `project.master.products` p ON df.product_id = p.product_id
  LEFT JOIN current_inventory ci ON df.product_id = ci.product_id 
                                AND df.warehouse_id = ci.warehouse_id;
  
  -- 2. 자동 주문 생성 (긴급 + 고우선순위)
  INSERT INTO `project.supply_chain.purchase_orders` (
    order_date,
    product_id,
    warehouse_id,
    quantity_ordered,
    unit_cost,
    total_cost,
    priority,
    order_type,
    expected_delivery_date,
    status
  )
  SELECT 
    CURRENT_DATE(),
    product_id,
    warehouse_id,
    reorder_quantity,
    unit_cost,
    order_cost,
    priority,
    'AUTO_REORDER' as order_type,
    DATE_ADD(CURRENT_DATE(), INTERVAL 3 DAY) as expected_delivery_date,
    'PENDING' as status
  FROM `project.supply_chain.inventory_recommendations`
  WHERE priority IN ('URGENT', 'HIGH')
    AND reorder_quantity > 0;
  
  -- 3. 알림 생성
  INSERT INTO `project.notifications.inventory_alerts` (
    alert_date,
    alert_type,
    product_count,
    total_order_value,
    urgent_items,
    high_priority_items
  )
  SELECT 
    CURRENT_DATE(),
    'AUTOMATED_REORDER',
    COUNT(*),
    SUM(order_cost),
    COUNT(CASE WHEN priority = 'URGENT' THEN 1 END),
    COUNT(CASE WHEN priority = 'HIGH' THEN 1 END)
  FROM `project.supply_chain.inventory_recommendations`
  WHERE reorder_quantity > 0;
  
END;

9. 모범 사례

9.1 스케줄 설계 원칙

시간대 및 타이밍 고려사항

-- 좋은 예: 명확한 시간대 설정과 비즈니스 로직 고려
-- 매일 새벽 2시 (데이터 소스 업데이트 완료 후)
-- Schedule: "every day 02:00"
-- Timezone: "Asia/Seoul"

-- 나쁜 예: 시간대 미설정으로 UTC 기준 실행
-- Schedule: "every day 02:00" (시간대 설정 없음)

의존성 관리

-- 스케줄된 쿼리 의존성 매핑 테이블
CREATE OR REPLACE TABLE `project.config.schedule_dependencies` (
  parent_schedule STRING,
  child_schedule STRING,
  dependency_type STRING, -- 'HARD', 'SOFT'
  max_wait_minutes INT64,
  created_at TIMESTAMP
);

-- 의존성 체크 함수
CREATE OR REPLACE FUNCTION `project.utils.check_dependency_ready`(
  parent_schedule STRING
) RETURNS BOOL
LANGUAGE SQL AS (
  SELECT 
    CASE 
      WHEN MAX(end_time) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 MINUTE)
           AND MAX(state) = 'DONE' 
           AND MAX(error_result) IS NULL 
      THEN TRUE 
      ELSE FALSE 
    END
  FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE job_id LIKE CONCAT('%', parent_schedule, '%')
    AND DATE(creation_time) = CURRENT_DATE()
);

-- 의존성을 고려한 조건부 실행
CREATE OR REPLACE PROCEDURE `project.etl.dependent_processing`()
BEGIN
  DECLARE dependencies_ready BOOL DEFAULT FALSE;
  
  -- 상위 작업 완료 확인
  SET dependencies_ready = `project.utils.check_dependency_ready`('parent_etl_job');
  
  IF dependencies_ready THEN
    -- 실제 처리 로직 실행
    CALL `project.etl.main_processing_logic`();
  ELSE
    -- 의존성 미충족 알림
    INSERT INTO `project.monitoring.dependency_alerts` (
      alert_time,
      job_name,
      missing_dependency,
      message
    ) VALUES (
      CURRENT_TIMESTAMP(),
      'dependent_processing',
      'parent_etl_job',
      'Parent job not completed within expected timeframe'
    );
  END IF;
END;

리소스 효율적 스케줄링

-- 리소스 사용량 분산을 위한 스케줄 설계
CREATE OR REPLACE TABLE `project.config.optimal_schedule_slots` (
  time_slot STRING,
  max_concurrent_jobs INT64,
  current_jobs INT64,
  resource_weight FLOAT64,
  recommended_for STRING
);

INSERT INTO `project.config.optimal_schedule_slots` VALUES
('00:00-02:00', 5, 0, 1.0, '대용량 ETL, 데이터 마이그레이션'),
('02:00-06:00', 8, 0, 0.8, '일일 집계, 리포트 생성'),
('06:00-09:00', 3, 0, 0.6, '비즈니스 크리티컬 작업'),
('09:00-18:00', 2, 0, 0.3, '실시간 모니터링만'),
('18:00-00:00', 4, 0, 0.5, '주간/월간 분석 작업');

-- 최적 스케줄 시간 추천 함수
CREATE OR REPLACE PROCEDURE `project.utils.recommend_schedule_time`(
  job_type STRING,
  estimated_duration_minutes INT64
)
BEGIN
  SELECT 
    time_slot,
    max_concurrent_jobs - current_jobs as available_slots,
    recommended_for
  FROM `project.config.optimal_schedule_slots`
  WHERE REGEXP_CONTAINS(recommended_for, job_type)
    AND current_jobs < max_concurrent_jobs
  ORDER BY resource_weight DESC, available_slots DESC
  LIMIT 3;
END;

장애 복구 전략

-- 자동 복구 메커니즘
CREATE OR REPLACE PROCEDURE `project.recovery.auto_recovery_scheduler`()
BEGIN
  DECLARE failed_jobs ARRAY<STRING>;
  DECLARE job_name STRING;
  
  -- 지난 4시간 내 실패한 크리티컬 작업 조회
  SET failed_jobs = ARRAY(
    SELECT job_id
    FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
    WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 HOUR)
      AND state = 'DONE'
      AND error_result IS NOT NULL
      AND labels.key = 'critical'
      AND labels.value = 'true'
  );
  
  -- 각 실패 작업에 대해 복구 시도
  FOR job IN (SELECT job_id FROM UNNEST(failed_jobs) as job_id) DO
    -- 복구 조건 확인 (최대 3회 시도)
    IF (SELECT COUNT(*) FROM `project.monitoring.recovery_attempts` 
        WHERE original_job_id = job.job_id AND DATE(attempt_date) = CURRENT_DATE()) < 3 THEN
      
      -- 복구 작업 실행
      CALL `project.recovery.retry_failed_job`(job.job_id);
      
      -- 복구 시도 로그
      INSERT INTO `project.monitoring.recovery_attempts` (
        original_job_id,
        attempt_date,
        recovery_method,
        initiated_by
      ) VALUES (
        job.job_id,
        CURRENT_DATE(),
        'auto_retry',
        'system'
      );
    END IF;
  END FOR;
END;

환경별 스케줄 관리

-- 환경별 스케줄 설정 관리
CREATE OR REPLACE TABLE `project.config.environment_schedules` (
  schedule_name STRING,
  dev_schedule STRING,
  staging_schedule STRING,
  prod_schedule STRING,
  is_active_dev BOOL,
  is_active_staging BOOL,
  is_active_prod BOOL
);

INSERT INTO `project.config.environment_schedules` VALUES
('daily_etl', 'every 2 hours', 'every day 01:00', 'every day 02:00', true, true, true),
('weekly_report', 'every day 10:00', 'every monday 09:00', 'every monday 08:00', false, true, true),
('monthly_closing', 'every day 15:00', '1 of month 10:00', '1 of month 06:00', false, true, true);

-- 환경에 따른 동적 스케줄 설정
CREATE OR REPLACE FUNCTION `project.utils.get_schedule_for_environment`(
  schedule_name STRING,
  environment STRING
) RETURNS STRING
LANGUAGE SQL AS (
  CASE environment
    WHEN 'dev' THEN (SELECT dev_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_dev)
    WHEN 'staging' THEN (SELECT staging_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_staging)
    WHEN 'prod' THEN (SELECT prod_schedule FROM `project.config.environment_schedules` WHERE schedule_name = schedule_name AND is_active_prod)
    ELSE 'invalid_environment'
  END
);

9.2 코드 구조화 및 모듈화

-- 1. 공통 함수들을 별도 데이터세트에 구성
-- `project.utils.date_functions`

CREATE OR REPLACE FUNCTION `project.utils.get_business_days_between`(
  start_date DATE,
  end_date DATE
) RETURNS INT64
LANGUAGE SQL AS (
  (SELECT COUNT(*)
   FROM UNNEST(GENERATE_DATE_ARRAY(start_date, end_date)) as d
   WHERE EXTRACT(DAYOFWEEK FROM d) BETWEEN 2 AND 6)  -- 월-금
);

-- 2. 설정 테이블 활용
CREATE OR REPLACE TABLE `project.config.etl_parameters` (
  parameter_name STRING,
  parameter_value STRING,
  parameter_type STRING,
  description STRING,
  updated_at TIMESTAMP
);

INSERT INTO `project.config.etl_parameters` VALUES
('batch_size', '10000', 'INTEGER', 'Default batch processing size', CURRENT_TIMESTAMP()),
('retention_days', '365', 'INTEGER', 'Data retention period', CURRENT_TIMESTAMP()),
('alert_email', 'data-team@company.com', 'STRING', 'Alert notification email', CURRENT_TIMESTAMP());

-- 3. 파라미터화된 프로시저
CREATE OR REPLACE PROCEDURE `project.etl.configurable_processing`(
  process_date DATE,
  batch_size INT64
)
BEGIN
  DECLARE actual_batch_size INT64;
  
  -- 설정 테이블에서 파라미터 조회
  SET actual_batch_size = COALESCE(
    batch_size,
    (SELECT CAST(parameter_value AS INT64) 
     FROM `project.config.etl_parameters` 
     WHERE parameter_name = 'batch_size')
  );
  
  -- 실제 처리 로직
  CALL `project.etl.batch_process_orders`(process_date, actual_batch_size);
END;

9.3 테스트 및 검증

-- 데이터 파이프라인 테스트 프레임워크
CREATE OR REPLACE PROCEDURE `project.testing.run_etl_tests`()
BEGIN
  DECLARE test_results ARRAY<STRUCT<test_name STRING, status STRING, message STRING>>;
  
  -- 테스트 1: 데이터 볼륨 검증
  IF (SELECT COUNT(*) FROM `project.processed.daily_summary` 
      WHERE summary_date = CURRENT_DATE()) = 0 THEN
    SET test_results = ARRAY_CONCAT(test_results, [
      STRUCT('volume_check' as test_name, 'FAIL' as status, 'No data found for today' as message)
    ]);
  ELSE
    SET test_results = ARRAY_CONCAT(test_results, [
      STRUCT('volume_check' as test_name, 'PASS' as status, 'Data volume OK' as message)
    ]);
  END IF;
  
  -- 테스트 2: 데이터 품질 검증
  IF EXISTS (
    SELECT 1 FROM `project.processed.daily_summary`
    WHERE summary_date = CURRENT_DATE()
      AND (total_revenue < 0 OR customer_count < 0)
  ) THEN
    SET test_results = ARRAY_CONCAT(test_results, [
      STRUCT('quality_check' as test_name, 'FAIL' as status, 'Negative values found' as message)
    ]);
  ELSE
    SET test_results = ARRAY_CONCAT(test_results, [
      STRUCT('quality_check' as test_name, 'PASS' as status, 'Data quality OK' as message)
    ]);
  END IF;
  
  -- 테스트 결과 저장
  INSERT INTO `project.testing.test_results` (
    test_date,
    test_name,
    status,
    message,
    executed_at
  )
  SELECT 
    CURRENT_DATE(),
    result.test_name,
    result.status,
    result.message,
    CURRENT_TIMESTAMP()
  FROM UNNEST(test_results) as result;
  
  -- 실패한 테스트가 있으면 알림
  IF EXISTS (SELECT 1 FROM UNNEST(test_results) WHERE status = 'FAIL') THEN
    INSERT INTO `project.notifications.test_alerts` (
      alert_timestamp,
      failed_tests,
      test_summary
    )
    SELECT 
      CURRENT_TIMESTAMP(),
      ARRAY(SELECT test_name FROM UNNEST(test_results) WHERE status = 'FAIL'),
      STRING_AGG(CONCAT(test_name, ': ', message), '; ')
    FROM UNNEST(test_results);
  END IF;
END;

9.4 문서화 및 메타데이터 관리

-- 스케줄된 쿼리 메타데이터 관리
CREATE OR REPLACE TABLE `project.metadata.scheduled_queries` (
  query_name STRING,
  description STRING,
  schedule_expression STRING,
  owner_email STRING,
  dependencies ARRAY<STRING>,
  output_tables ARRAY<STRING>,
  business_purpose STRING,
  sla_hours INT64,
  created_date DATE,
  last_modified_date DATE,
  is_active BOOL
);

INSERT INTO `project.metadata.scheduled_queries` VALUES
(
  'daily_customer_summary',
  'Daily aggregation of customer metrics including orders, revenue, and engagement',
  'every day 02:00',
  'data-team@company.com',
  ['raw.orders', 'raw.customers'],
  ['marts.customer_daily_summary'],
  'Support customer analytics and segmentation for marketing team',
  4,  -- 4시간 SLA
  '2024-01-01',
  CURRENT_DATE(),
  true
);

-- 스케줄된 쿼리 인벤토리 뷰
CREATE OR REPLACE VIEW `project.metadata.query_inventory` AS
SELECT 
  sq.query_name,
  sq.description,
  sq.schedule_expression,
  sq.owner_email,
  sq.sla_hours,
  
  -- 최근 실행 정보 (INFORMATION_SCHEMA 조인)
  recent.last_execution,
  recent.last_status,
  recent.avg_duration_minutes,
  recent.total_executions_7days,
  recent.success_rate_7days
  
FROM `project.metadata.scheduled_queries` sq
LEFT JOIN (
  SELECT 
    'daily_customer_summary' as query_name,  -- 실제로는 job_id와 매핑 필요
    MAX(end_time) as last_execution,
    MAX(state) as last_status,
    AVG(TIMESTAMP_DIFF(end_time, start_time, SECOND)) / 60 as avg_duration_minutes,
    COUNT(*) as total_executions_7days,
    COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END) 
      / COUNT(*) * 100 as success_rate_7days
  FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
    AND job_type = 'QUERY'
  GROUP BY query_name
) recent ON sq.query_name = recent.query_name
WHERE sq.is_active = true;

9.5 운영 가이드라인

-- 운영 체크리스트 자동화
CREATE OR REPLACE PROCEDURE `project.operations.daily_health_check`()
BEGIN
  DECLARE health_summary STRING;
  
  -- 1. 스케줄된 쿼리 실행 상태 확인
  CREATE OR REPLACE TEMP TABLE daily_execution_status AS
  SELECT 
    'scheduled_queries' as check_category,
    COUNT(*) as total_jobs,
    COUNT(CASE WHEN state = 'DONE' AND error_result IS NULL THEN 1 END) as successful_jobs,
    COUNT(CASE WHEN state = 'RUNNING' THEN 1 END) as running_jobs,
    COUNT(CASE WHEN state = 'DONE' AND error_result IS NOT NULL THEN 1 END) as failed_jobs
  FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE DATE(creation_time) = CURRENT_DATE()
    AND job_type = 'QUERY'
    AND user_email LIKE '%scheduled-query%';
  
  -- 2. 데이터 신선도 확인
  INSERT INTO daily_execution_status
  SELECT 
    'data_freshness' as check_category,
    COUNT(*) as total_tables,
    COUNT(CASE WHEN last_modified_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) 
               THEN 1 END) as fresh_tables,
    0 as running_jobs,
    COUNT(CASE WHEN last_modified_time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 48 HOUR) 
               THEN 1 END) as stale_tables
  FROM `project.marts.INFORMATION_SCHEMA.TABLES`
  WHERE table_type = 'BASE TABLE';
  
  -- 3. 리소스 사용량 확인
  INSERT INTO daily_execution_status
  SELECT 
    'resource_usage' as check_category,
    COUNT(*) as total_queries,
    COUNT(CASE WHEN total_slot_ms < 3600000 THEN 1 END) as normal_usage,  -- < 1시간
    0 as running_jobs,
    COUNT(CASE WHEN total_slot_ms >= 3600000 THEN 1 END) as high_usage     -- >= 1시간
  FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
  WHERE DATE(creation_time) = CURRENT_DATE();
  
  -- 4. 종합 상태 리포트 생성
  SET health_summary = (
    SELECT STRING_AGG(
      CONCAT(
        check_category, ': ',
        successful_jobs, '/', total_jobs, ' OK, ',
        failed_jobs, ' failed'
      ), 
      ' | '
    )
    FROM daily_execution_status
  );
  
  -- 5. 건강성 점수 계산
  CREATE OR REPLACE TABLE `project.operations.daily_health_score` AS
  SELECT 
    CURRENT_DATE() as check_date,
    health_summary,
    
    -- 종합 점수 (100점 만점)
    ROUND(
      (SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) +  -- 성공률 50점
      (COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +                -- 무결점 30점  
      (COUNTIF(running_jobs = 0) / COUNT(*) * 20)                 -- 지연없음 20점
    ) as health_score,
    
    CASE 
      WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) + 
                 (COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
                 (COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 90 THEN 'EXCELLENT'
      WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) + 
                 (COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
                 (COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 70 THEN 'GOOD'
      WHEN ROUND((SUM(successful_jobs) / NULLIF(SUM(total_jobs), 0) * 50) + 
                 (COUNTIF(failed_jobs = 0) / COUNT(*) * 30) +
                 (COUNTIF(running_jobs = 0) / COUNT(*) * 20)) >= 50 THEN 'FAIR'
      ELSE 'POOR'
    END as health_status,
    
    CURRENT_TIMESTAMP() as generated_at
  FROM daily_execution_status;
  
  -- 6. 점수가 70점 미만이면 알림
  IF (SELECT health_score FROM `project.operations.daily_health_score` 
      WHERE check_date = CURRENT_DATE()) < 70 THEN
    INSERT INTO `project.notifications.operations_alerts` (
      alert_timestamp,
      alert_type,
      health_score,
      health_summary
    )
    SELECT 
      CURRENT_TIMESTAMP(),
      'LOW_HEALTH_SCORE',
      health_score,
      health_summary
    FROM `project.operations.daily_health_score`
    WHERE check_date = CURRENT_DATE();
  END IF;
END;

BigQuery 스케줄된 쿼리를 효과적으로 활용하면 복잡한 데이터 파이프라인을 자동화하고, 안정적인 데이터 운영 환경을 구축할 수 있습니다. 적절한 오류 처리, 모니터링, 테스트를 통해 신뢰할 수 있는 데이터 인프라를 만들어보세요.