데이터 거버넌스
데이터 거버넌스 체계
BigQuery에서 데이터 거버넌스, 품질 관리, 메타데이터 관리를 위한 종합 가이드입니다.
목차
1. 데이터 거버넌스 개요
1.1 데이터 거버넌스 프레임워크
데이터 거버넌스는 조직의 데이터 자산을 효율적으로 관리하고 활용하기 위한 체계입니다.
-- 데이터 거버넌스 정책 테이블
CREATE OR REPLACE TABLE `project.governance.data_policies` (
policy_id STRING,
policy_name STRING,
policy_type STRING, -- 'QUALITY', 'PRIVACY', 'RETENTION', 'ACCESS'
description STRING,
rules JSON,
owner STRING,
effective_date DATE,
review_date DATE,
status STRING
);
-- 데이터 자산 등록
CREATE OR REPLACE TABLE `project.governance.data_assets` (
asset_id STRING,
asset_name STRING,
asset_type STRING, -- 'TABLE', 'VIEW', 'DATASET'
description STRING,
data_owner STRING,
business_steward STRING,
sensitivity_level STRING, -- 'PUBLIC', 'INTERNAL', 'CONFIDENTIAL', 'RESTRICTED'
retention_period INT64,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
1.2 데이터 소유권 및 책임
-- 데이터 소유권 매트릭스
WITH data_ownership AS (
SELECT
table_schema as dataset_name,
table_name,
'sales_team@company.com' as data_owner,
'data_analyst@company.com' as data_steward,
'HIGH' as business_criticality
FROM `project.sales.INFORMATION_SCHEMA.TABLES`
UNION ALL
SELECT
table_schema,
table_name,
'hr_team@company.com',
'hr_analyst@company.com',
'CRITICAL'
FROM `project.hr.INFORMATION_SCHEMA.TABLES`
)
SELECT
dataset_name,
COUNT(*) as table_count,
data_owner,
data_steward,
business_criticality
FROM data_ownership
GROUP BY dataset_name, data_owner, data_steward, business_criticality;
2. 데이터 품질 관리
2.1 데이터 품질 측정
-- 종합적인 데이터 품질 평가
CREATE OR REPLACE PROCEDURE `project.governance.assess_data_quality`(
target_table STRING
)
BEGIN
DECLARE table_name STRING;
DECLARE dataset_name STRING;
SET table_name = SPLIT(target_table, '.')[OFFSET(2)];
SET dataset_name = SPLIT(target_table, '.')[OFFSET(1)];
-- 데이터 품질 메트릭 계산
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.quality_metrics` (
table_name,
assessment_date,
total_records,
null_percentage,
duplicate_percentage,
freshness_hours,
completeness_score,
consistency_score,
accuracy_score,
overall_quality_score
)
WITH quality_assessment AS (
SELECT
'%s' as table_name,
CURRENT_DATE() as assessment_date,
COUNT(*) as total_records,
-- NULL 비율
ROUND(
(SELECT COUNT(*) FROM `%s` WHERE %s IS NULL) / COUNT(*) * 100, 2
) as null_percentage,
-- 중복 비율 (기본키 기준)
ROUND(
(COUNT(*) - COUNT(DISTINCT %s)) / COUNT(*) * 100, 2
) as duplicate_percentage,
-- 데이터 신선도 (시간 컬럼 기준)
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(%s), HOUR) as freshness_hours
FROM `%s`
),
scoring AS (
SELECT
*,
-- 완성도 점수 (100 - null_percentage)
100 - null_percentage as completeness_score,
-- 일관성 점수 (100 - duplicate_percentage)
100 - duplicate_percentage as consistency_score,
-- 정확성 점수 (데이터 패턴 기반)
CASE
WHEN freshness_hours <= 24 THEN 100
WHEN freshness_hours <= 48 THEN 80
WHEN freshness_hours <= 72 THEN 60
ELSE 40
END as accuracy_score
FROM quality_assessment
)
SELECT
*,
ROUND((completeness_score + consistency_score + accuracy_score) / 3, 2) as overall_quality_score
FROM scoring
""",
table_name, target_table, 'id', 'id', 'updated_at', target_table);
END;
2.2 자동 품질 검증
-- 데이터 품질 규칙 엔진
CREATE OR REPLACE TABLE `project.governance.quality_rules` (
rule_id STRING,
table_name STRING,
column_name STRING,
rule_type STRING,
rule_condition STRING,
expected_value STRING,
severity STRING, -- 'ERROR', 'WARNING', 'INFO'
is_active BOOL
);
-- 품질 규칙 실행
CREATE OR REPLACE PROCEDURE `project.governance.run_quality_checks`()
BEGIN
FOR rule IN (
SELECT * FROM `project.governance.quality_rules`
WHERE is_active = true
) DO
CASE rule.rule_type
WHEN 'NOT_NULL' THEN
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.quality_violations`
SELECT
'%s' as rule_id,
CURRENT_TIMESTAMP() as check_time,
'%s' as table_name,
'%s' as column_name,
'NOT_NULL_VIOLATION' as violation_type,
COUNT(*) as violation_count,
'%s' as severity
FROM `%s.%s`
WHERE %s IS NULL
HAVING COUNT(*) > 0
""", rule.rule_id, rule.table_name, rule.column_name,
rule.severity, 'project.data', rule.table_name, rule.column_name);
WHEN 'RANGE_CHECK' THEN
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.quality_violations`
SELECT
'%s' as rule_id,
CURRENT_TIMESTAMP(),
'%s',
'%s',
'RANGE_VIOLATION',
COUNT(*),
'%s'
FROM `%s.%s`
WHERE NOT (%s)
HAVING COUNT(*) > 0
""", rule.rule_id, rule.table_name, rule.column_name,
rule.severity, 'project.data', rule.table_name, rule.rule_condition);
WHEN 'REFERENTIAL_INTEGRITY' THEN
-- 참조 무결성 검사
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.quality_violations`
SELECT
'%s',
CURRENT_TIMESTAMP(),
'%s',
'%s',
'REFERENTIAL_INTEGRITY_VIOLATION',
COUNT(*),
'%s'
FROM `%s.%s` t1
LEFT JOIN `%s` t2 ON %s
WHERE t2.%s IS NULL AND t1.%s IS NOT NULL
HAVING COUNT(*) > 0
""", rule.rule_id, rule.table_name, rule.column_name, rule.severity,
'project.data', rule.table_name, rule.expected_value,
SPLIT(rule.expected_value, '.')[OFFSET(2)],
rule.column_name, rule.column_name);
END CASE;
END FOR;
END;
3. 메타데이터 관리
3.1 자동 메타데이터 수집
-- 테이블 메타데이터 자동 수집
CREATE OR REPLACE VIEW `project.governance.table_metadata` AS
SELECT
t.table_catalog as project_id,
t.table_schema as dataset_id,
t.table_name,
t.table_type,
t.creation_time,
t.ddl,
-- 테이블 통계
COALESCE(ts.row_count, 0) as row_count,
COALESCE(ts.size_bytes, 0) as size_bytes,
ROUND(COALESCE(ts.size_bytes, 0) / 1024 / 1024 / 1024, 2) as size_gb,
-- 파티션 정보
p.partition_id,
p.partition_type,
-- 클러스터링 정보
ARRAY_AGG(
DISTINCT c.clustering_column_name
ORDER BY c.clustering_ordinal_position
) as clustering_columns,
-- 컬럼 수
(SELECT COUNT(*)
FROM `project.INFORMATION_SCHEMA.COLUMNS` col
WHERE col.table_name = t.table_name
AND col.table_schema = t.table_schema) as column_count
FROM `project.INFORMATION_SCHEMA.TABLES` t
LEFT JOIN `project.INFORMATION_SCHEMA.TABLE_STORAGE` ts
ON t.table_name = ts.table_name
AND t.table_schema = ts.table_schema
LEFT JOIN `project.INFORMATION_SCHEMA.PARTITIONS` p
ON t.table_name = p.table_name
AND t.table_schema = p.table_schema
LEFT JOIN `project.INFORMATION_SCHEMA.CLUSTERING_COLUMNS` c
ON t.table_name = c.table_name
AND t.table_schema = c.table_schema
WHERE t.table_type IN ('BASE TABLE', 'VIEW')
GROUP BY
t.table_catalog, t.table_schema, t.table_name, t.table_type,
t.creation_time, t.ddl, ts.row_count, ts.size_bytes,
p.partition_id, p.partition_type;
3.2 컬럼 프로파일링
-- 컬럼 데이터 프로파일링
CREATE OR REPLACE PROCEDURE `project.governance.profile_columns`(
target_table STRING
)
BEGIN
DECLARE col_name STRING;
DECLARE col_type STRING;
FOR column_info IN (
SELECT column_name, data_type
FROM `project.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = SPLIT(target_table, '.')[OFFSET(2)]
AND table_schema = SPLIT(target_table, '.')[OFFSET(1)]
) DO
SET col_name = column_info.column_name;
SET col_type = column_info.data_type;
-- 수치형 컬럼 프로파일링
IF col_type IN ('INT64', 'FLOAT64', 'NUMERIC') THEN
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.column_profiles`
SELECT
'%s' as table_name,
'%s' as column_name,
'%s' as data_type,
COUNT(*) as total_count,
COUNT(%s) as non_null_count,
COUNT(DISTINCT %s) as distinct_count,
CAST(MIN(%s) AS STRING) as min_value,
CAST(MAX(%s) AS STRING) as max_value,
CAST(AVG(%s) AS STRING) as avg_value,
CAST(STDDEV(%s) AS STRING) as stddev_value,
CURRENT_TIMESTAMP() as profiled_at
FROM `%s`
""", target_table, col_name, col_type, col_name, col_name,
col_name, col_name, col_name, col_name, target_table);
-- 문자형 컬럼 프로파일링
ELSEIF col_type IN ('STRING', 'BYTES') THEN
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO `project.governance.column_profiles`
SELECT
'%s' as table_name,
'%s' as column_name,
'%s' as data_type,
COUNT(*) as total_count,
COUNT(%s) as non_null_count,
COUNT(DISTINCT %s) as distinct_count,
CAST(MIN(LENGTH(%s)) AS STRING) as min_value,
CAST(MAX(LENGTH(%s)) AS STRING) as max_value,
CAST(AVG(LENGTH(%s)) AS STRING) as avg_value,
NULL as stddev_value,
CURRENT_TIMESTAMP() as profiled_at
FROM `%s`
""", target_table, col_name, col_type, col_name, col_name,
col_name, col_name, col_name, target_table);
END IF;
END FOR;
END;
4. 데이터 계보 추적
4.1 데이터 계보 모델링
-- 데이터 계보 추적 테이블
CREATE OR REPLACE TABLE `project.governance.data_lineage` (
lineage_id STRING,
source_table STRING,
target_table STRING,
transformation_type STRING, -- 'SELECT', 'JOIN', 'AGGREGATE', 'UNION'
transformation_logic STRING,
dependency_type STRING, -- 'DIRECT', 'INDIRECT'
created_by STRING,
created_at TIMESTAMP,
is_active BOOL
);
-- 계보 관계 자동 생성
CREATE OR REPLACE PROCEDURE `project.governance.extract_lineage_from_view`(
view_name STRING
)
BEGIN
DECLARE view_ddl STRING;
DECLARE source_tables ARRAY<STRING>;
-- 뷰 DDL 조회
SET view_ddl = (
SELECT ddl
FROM `project.INFORMATION_SCHEMA.VIEWS`
WHERE table_name = SPLIT(view_name, '.')[OFFSET(2)]
AND table_schema = SPLIT(view_name, '.')[OFFSET(1)]
);
-- DDL에서 소스 테이블 추출 (간단한 패턴 매칭)
SET source_tables = (
SELECT ARRAY_AGG(DISTINCT
REGEXP_EXTRACT(view_ddl, r'FROM `([^`]+)`')
)
);
-- 계보 관계 삽입
FOR source_table IN UNNEST(source_tables) DO
IF source_table IS NOT NULL THEN
INSERT INTO `project.governance.data_lineage` VALUES (
GENERATE_UUID(),
source_table,
view_name,
'SELECT',
view_ddl,
'DIRECT',
SESSION_USER(),
CURRENT_TIMESTAMP(),
true
);
END IF;
END FOR;
END;
4.2 영향도 분석
-- 데이터 변경 영향도 분석
CREATE OR REPLACE FUNCTION `project.governance.get_downstream_tables`(
source_table STRING
) RETURNS ARRAY<STRING>
LANGUAGE SQL AS (
WITH RECURSIVE lineage_tree AS (
-- 기본 케이스: 직접 의존성
SELECT target_table, 1 as level
FROM `project.governance.data_lineage`
WHERE source_table = source_table AND is_active = true
UNION ALL
-- 재귀 케이스: 간접 의존성
SELECT l.target_table, lt.level + 1
FROM `project.governance.data_lineage` l
JOIN lineage_tree lt ON l.source_table = lt.target_table
WHERE l.is_active = true AND lt.level < 10 -- 무한 루프 방지
)
SELECT ARRAY_AGG(DISTINCT target_table)
FROM lineage_tree
);
-- 테이블 변경 시 영향받는 자산 조회
SELECT
'customers' as source_table,
`project.governance.get_downstream_tables`('project.raw.customers') as affected_tables;
5. 데이터 카탈로그
5.1 비즈니스 용어집
-- 비즈니스 용어집 관리
CREATE OR REPLACE TABLE `project.governance.business_glossary` (
term_id STRING,
business_term STRING,
definition STRING,
technical_name STRING,
data_domain STRING,
synonyms ARRAY<STRING>,
related_terms ARRAY<STRING>,
usage_examples STRING,
owner STRING,
approved_by STRING,
version INT64,
status STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- 용어와 데이터 자산 매핑
CREATE OR REPLACE TABLE `project.governance.term_asset_mapping` (
mapping_id STRING,
term_id STRING,
asset_type STRING, -- 'TABLE', 'COLUMN', 'VIEW'
asset_path STRING, -- 'project.dataset.table' or 'project.dataset.table.column'
mapping_type STRING, -- 'PRIMARY', 'SECONDARY'
confidence_score FLOAT64,
created_at TIMESTAMP
);
5.2 자동 태깅 시스템
-- 자동 데이터 분류 및 태깅
CREATE OR REPLACE PROCEDURE `project.governance.auto_classify_tables`()
BEGIN
-- PII 데이터 감지 및 태깅
INSERT INTO `project.governance.data_tags` (
asset_path,
tag_key,
tag_value,
confidence_score,
detection_method,
created_at
)
SELECT
CONCAT(table_schema, '.', table_name, '.', column_name) as asset_path,
'DATA_CLASSIFICATION' as tag_key,
'PII' as tag_value,
0.9 as confidence_score,
'PATTERN_MATCHING' as detection_method,
CURRENT_TIMESTAMP()
FROM `project.INFORMATION_SCHEMA.COLUMNS`
WHERE LOWER(column_name) LIKE '%email%'
OR LOWER(column_name) LIKE '%phone%'
OR LOWER(column_name) LIKE '%ssn%'
OR LOWER(column_name) LIKE '%credit_card%'
OR LOWER(column_name) LIKE '%address%';
-- 재무 데이터 감지
INSERT INTO `project.governance.data_tags`
SELECT
CONCAT(table_schema, '.', table_name, '.', column_name),
'DATA_DOMAIN',
'FINANCIAL',
0.8,
'PATTERN_MATCHING',
CURRENT_TIMESTAMP()
FROM `project.INFORMATION_SCHEMA.COLUMNS`
WHERE LOWER(column_name) LIKE '%amount%'
OR LOWER(column_name) LIKE '%price%'
OR LOWER(column_name) LIKE '%cost%'
OR LOWER(column_name) LIKE '%revenue%'
OR LOWER(column_name) LIKE '%salary%';
-- 시간 데이터 감지
INSERT INTO `project.governance.data_tags`
SELECT
CONCAT(table_schema, '.', table_name, '.', column_name),
'DATA_TYPE',
'TEMPORAL',
1.0,
'SCHEMA_ANALYSIS',
CURRENT_TIMESTAMP()
FROM `project.INFORMATION_SCHEMA.COLUMNS`
WHERE data_type IN ('TIMESTAMP', 'DATETIME', 'DATE', 'TIME');
END;
6. 실제 활용 사례
6.1 규제 준수 모니터링
-- GDPR 준수를 위한 개인정보 사용 추적
CREATE OR REPLACE VIEW `project.governance.gdpr_compliance_dashboard` AS
WITH pii_tables AS (
SELECT
CONCAT(table_schema, '.', table_name) as table_path,
COUNT(*) as pii_columns
FROM `project.INFORMATION_SCHEMA.COLUMNS` c
JOIN `project.governance.data_tags` t
ON CONCAT(c.table_schema, '.', c.table_name, '.', c.column_name) = t.asset_path
WHERE t.tag_key = 'DATA_CLASSIFICATION' AND t.tag_value = 'PII'
GROUP BY table_path
),
usage_tracking AS (
SELECT
referenced_table,
COUNT(*) as query_count,
COUNT(DISTINCT user_email) as user_count,
MAX(creation_time) as last_access
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE DATE(creation_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
AND referenced_table IS NOT NULL
GROUP BY referenced_table
)
SELECT
p.table_path,
p.pii_columns,
COALESCE(u.query_count, 0) as queries_last_30_days,
COALESCE(u.user_count, 0) as users_last_30_days,
u.last_access,
-- 데이터 보존 기간 확인
da.retention_period,
CASE
WHEN da.retention_period IS NULL THEN 'NO_RETENTION_POLICY'
WHEN DATE_ADD(da.created_at, INTERVAL da.retention_period DAY) < CURRENT_DATE()
THEN 'RETENTION_EXPIRED'
ELSE 'COMPLIANT'
END as retention_status
FROM pii_tables p
LEFT JOIN usage_tracking u ON p.table_path = u.referenced_table
LEFT JOIN `project.governance.data_assets` da ON p.table_path = da.asset_name
ORDER BY p.pii_columns DESC, queries_last_30_days DESC;
6.2 데이터 품질 대시보드
-- 실시간 데이터 품질 대시보드
CREATE OR REPLACE VIEW `project.governance.quality_dashboard` AS
WITH quality_summary AS (
SELECT
DATE(assessment_date) as quality_date,
COUNT(*) as tables_assessed,
AVG(overall_quality_score) as avg_quality_score,
COUNT(CASE WHEN overall_quality_score >= 90 THEN 1 END) as excellent_tables,
COUNT(CASE WHEN overall_quality_score BETWEEN 70 AND 89 THEN 1 END) as good_tables,
COUNT(CASE WHEN overall_quality_score < 70 THEN 1 END) as poor_tables
FROM `project.governance.quality_metrics`
WHERE assessment_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY DATE(assessment_date)
),
violation_summary AS (
SELECT
DATE(check_time) as violation_date,
severity,
COUNT(*) as violation_count
FROM `project.governance.quality_violations`
WHERE DATE(check_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY DATE(check_time), severity
)
SELECT
qs.quality_date,
qs.tables_assessed,
ROUND(qs.avg_quality_score, 2) as avg_quality_score,
qs.excellent_tables,
qs.good_tables,
qs.poor_tables,
-- 위반 사항 요약
COALESCE(vs_error.violation_count, 0) as error_violations,
COALESCE(vs_warning.violation_count, 0) as warning_violations,
-- 트렌드
LAG(qs.avg_quality_score) OVER (ORDER BY qs.quality_date) as prev_day_score,
ROUND(qs.avg_quality_score - LAG(qs.avg_quality_score) OVER (ORDER BY qs.quality_date), 2) as score_change
FROM quality_summary qs
LEFT JOIN violation_summary vs_error
ON qs.quality_date = vs_error.violation_date AND vs_error.severity = 'ERROR'
LEFT JOIN violation_summary vs_warning
ON qs.quality_date = vs_warning.violation_date AND vs_warning.severity = 'WARNING'
ORDER BY qs.quality_date DESC;
6.3 자동화된 거버넌스 워크플로우
-- 일일 거버넌스 체크 프로시저
CREATE OR REPLACE PROCEDURE `project.governance.daily_governance_check`()
BEGIN
-- 1. 새로 생성된 테이블 감지 및 분류
CALL `project.governance.auto_classify_tables`();
-- 2. 데이터 품질 평가 실행
FOR table_record IN (
SELECT CONCAT(table_schema, '.', table_name) as full_table_name
FROM `project.INFORMATION_SCHEMA.TABLES`
WHERE table_type = 'BASE TABLE'
AND creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
) DO
CALL `project.governance.assess_data_quality`(table_record.full_table_name);
END FOR;
-- 3. 품질 규칙 검증
CALL `project.governance.run_quality_checks`();
-- 4. 보존 정책 준수 확인
INSERT INTO `project.governance.retention_violations` (
table_name,
retention_period_days,
oldest_record_date,
days_overdue,
violation_date
)
SELECT
da.asset_name,
da.retention_period,
(SELECT MIN(DATE(created_at)) FROM `project.data.*`), -- 동적 쿼리 필요
DATE_DIFF(
CURRENT_DATE(),
DATE_ADD((SELECT MIN(DATE(created_at)) FROM `project.data.*`), INTERVAL da.retention_period DAY),
DAY
) as days_overdue,
CURRENT_DATE()
FROM `project.governance.data_assets` da
WHERE da.retention_period IS NOT NULL
AND DATE_ADD(da.created_at, INTERVAL da.retention_period DAY) < CURRENT_DATE();
-- 5. 거버넌스 알림 생성
INSERT INTO `project.governance.governance_alerts` (
alert_type,
alert_message,
severity,
alert_date
)
SELECT
'QUALITY_DEGRADATION',
CONCAT('Data quality score dropped below 70 for table: ', table_name),
'HIGH',
CURRENT_DATE()
FROM `project.governance.quality_metrics`
WHERE assessment_date = CURRENT_DATE()
AND overall_quality_score < 70;
END;
BigQuery를 통한 데이터 거버넌스는 조직의 데이터 자산을 체계적으로 관리하고 품질을 보장하는 핵심 요소입니다. 자동화된 품질 검증, 메타데이터 관리, 계보 추적을 통해 신뢰할 수 있는 데이터 환경을 구축할 수 있습니다.