보안 관리

보안 정책 및 권한 관리

BigQuery에서 데이터 보안, 접근 제어, 개인정보 보호를 위한 종합적인 보안 관리 방법을 다루는 가이드입니다.


목차

  1. BigQuery 보안 개요
  2. IAM 권한 관리
  3. 데이터세트 및 테이블 보안
  4. 행 레벨 보안 (RLS)
  5. 컬럼 레벨 보안 (CLS)
  6. 데이터 분류 및 태깅
  7. 암호화 및 키 관리
  8. 감사 로그 및 모니터링
  9. 개인정보 보호 (Privacy)
  10. 보안 모범 사례

1. BigQuery 보안 개요

1.1 보안 계층 구조

BigQuery는 다중 계층 보안 모델을 제공합니다:

┌─ Organization Level (조직)
│  ├─ Project Level (프로젝트)  
│  │  ├─ Dataset Level (데이터세트)
│  │  │  ├─ Table/View Level (테이블/뷰)
│  │  │  │  ├─ Row Level (행 레벨)
│  │  │  │  └─ Column Level (컬럼 레벨)

1.2 주요 보안 기능

  • Identity and Access Management (IAM): 역할 기반 접근 제어
  • Row-Level Security (RLS): 행 단위 접근 제어
  • Column-Level Security (CLS): 컬럼 단위 접근 제어
  • Data Classification: 자동 데이터 분류 및 태깅
  • Encryption: 전송 중/저장 중 암호화
  • Audit Logs: 모든 접근 및 작업 감사
  • VPC Service Controls: 네트워크 수준 보안

2. IAM 권한 관리

2.1 기본 역할 (Primitive Roles)

# 프로젝트 수준 기본 역할
# Viewer (뷰어) - 읽기 전용
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:viewer@company.com" \
  --role="roles/viewer"

# Editor (편집자) - 읽기/쓰기
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:editor@company.com" \
  --role="roles/editor"

# Owner (소유자) - 모든 권한
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:owner@company.com" \
  --role="roles/owner"

2.2 사전 정의된 BigQuery 역할

# BigQuery 관리자 - 모든 BigQuery 리소스 관리
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:admin@company.com" \
  --role="roles/bigquery.admin"

# BigQuery 사용자 - 쿼리 실행, 데이터세트/테이블 생성
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:analyst@company.com" \
  --role="roles/bigquery.user"

# BigQuery 데이터 뷰어 - 데이터 읽기 전용
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:viewer@company.com" \
  --role="roles/bigquery.dataViewer"

# BigQuery 데이터 편집자 - 데이터 읽기/쓰기
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:editor@company.com" \
  --role="roles/bigquery.dataEditor"

# BigQuery 작업 사용자 - 쿼리 실행만 가능
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="user:query-runner@company.com" \
  --role="roles/bigquery.jobUser"

2.3 커스텀 역할 생성

# custom-bigquery-analyst.yaml
title: "Custom BigQuery Analyst"
description: "Limited BigQuery access for analysts"
stage: "GA"
includedPermissions:
  # 쿼리 관련
  - bigquery.jobs.create
  - bigquery.jobs.get
  - bigquery.jobs.list
  
  # 데이터 읽기
  - bigquery.tables.get
  - bigquery.tables.getData
  - bigquery.tables.list
  
  # 스키마 정보
  - bigquery.datasets.get
  - bigquery.routines.get
  - bigquery.routines.list
  
  # 뷰 생성 (임시 분석용)
  - bigquery.tables.create
  - bigquery.tables.update
  - bigquery.tables.delete
# 커스텀 역할 생성
gcloud iam roles create customBigqueryAnalyst \
  --project=PROJECT_ID \
  --file=custom-bigquery-analyst.yaml

# 역할 할당
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="group:analysts@company.com" \
  --role="projects/PROJECT_ID/roles/customBigqueryAnalyst"

2.4 서비스 계정 보안

# 서비스 계정 생성
gcloud iam service-accounts create bigquery-etl-service \
  --description="ETL pipeline service account" \
  --display-name="BigQuery ETL Service"

# 최소 권한 할당
gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:bigquery-etl-service@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/bigquery.dataEditor"

# 특정 데이터세트에만 권한 부여
bq add-iam-policy-binding \
  --member="serviceAccount:bigquery-etl-service@PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/bigquery.dataEditor" \
  PROJECT_ID:etl_dataset

# 키 회전 설정 (90일마다)
gcloud iam service-accounts keys create key.json \
  --iam-account=bigquery-etl-service@PROJECT_ID.iam.gserviceaccount.com \
  --key-file-type=json

3. 데이터세트 및 테이블 보안

3.1 데이터세트 수준 권한

-- 데이터세트 생성 시 접근 제어 설정
CREATE SCHEMA `project.sensitive_data`
OPTIONS (
  description = "Sensitive customer data with restricted access",
  location = "US",
  default_table_expiration_days = 365
);
# CLI를 통한 데이터세트 권한 설정
# 특정 그룹에 읽기 권한 부여
bq add-iam-policy-binding \
  --member="group:analysts@company.com" \
  --role="roles/bigquery.dataViewer" \
  PROJECT_ID:sensitive_data

# 개별 사용자에게 관리 권한
bq add-iam-policy-binding \
  --member="user:data-owner@company.com" \
  --role="roles/bigquery.dataOwner" \
  PROJECT_ID:sensitive_data

# 권한 확인
bq get-iam-policy PROJECT_ID:sensitive_data

3.2 테이블 수준 권한

-- 테이블별 세밀한 권한 제어
-- 특정 테이블에만 접근 가능한 뷰 생성
CREATE VIEW `project.public_views.customer_summary` AS
SELECT 
  customer_id,
  customer_name,
  registration_date,
  total_orders,
  -- 민감한 정보는 제외
  -- email, phone, address 제외
FROM `project.sensitive_data.customers`
WHERE status = 'active';

-- 뷰에 대한 권한 부여
GRANT `roles/bigquery.dataViewer` ON TABLE `project.public_views.customer_summary`
TO 'group:marketing@company.com';

3.3 임시 테이블 보안

-- 임시 테이블 생성 시 보안 설정
CREATE TEMP TABLE temp_analysis AS
SELECT 
  anonymized_user_id,  -- 익명화된 ID 사용
  DATE_TRUNC(event_date, WEEK) as week,  -- 날짜 일반화
  event_count
FROM `project.raw_data.user_events`
WHERE DATE(event_date) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY);

-- 세션 종료 시 자동 삭제됨

4. 행 레벨 보안 (RLS)

4.1 행 레벨 보안 정책 생성

-- 1. 행 레벨 보안 정책 테이블 생성
CREATE OR REPLACE TABLE `project.security.row_access_policies` (
  user_email STRING,
  allowed_regions ARRAY<STRING>,
  allowed_departments ARRAY<STRING>,
  data_access_level STRING,
  effective_from DATE,
  effective_to DATE
);

-- 정책 데이터 삽입
INSERT INTO `project.security.row_access_policies` VALUES
('analyst1@company.com', ['US', 'CA'], ['Sales', 'Marketing'], 'STANDARD', '2024-01-01', '2024-12-31'),
('manager1@company.com', ['US', 'CA', 'EU'], ['Sales', 'Marketing', 'Support'], 'ELEVATED', '2024-01-01', '2024-12-31'),
('admin1@company.com', ['US', 'CA', 'EU', 'APAC'], ['Sales', 'Marketing', 'Support', 'Engineering'], 'FULL', '2024-01-01', '2024-12-31');

-- 2. 행 레벨 보안 함수 생성
CREATE OR REPLACE FUNCTION `project.security.check_row_access`(
  user_region STRING,
  user_department STRING,
  data_sensitivity STRING
)
RETURNS BOOL
LANGUAGE SQL
AS (
  EXISTS (
    SELECT 1 
    FROM `project.security.row_access_policies` p
    WHERE p.user_email = SESSION_USER()
      AND user_region IN UNNEST(p.allowed_regions)
      AND user_department IN UNNEST(p.allowed_departments)
      AND (
        data_sensitivity = 'PUBLIC' OR
        (data_sensitivity = 'INTERNAL' AND p.data_access_level IN ('ELEVATED', 'FULL')) OR
        (data_sensitivity = 'CONFIDENTIAL' AND p.data_access_level = 'FULL')
      )
      AND CURRENT_DATE() BETWEEN p.effective_from AND p.effective_to
  )
);

-- 3. 행 필터 정책 적용
CREATE OR REPLACE ROW ACCESS POLICY regional_department_filter
ON `project.data.employee_records`
GRANT TO ('group:all-users@company.com')
FILTER USING (
  `project.security.check_row_access`(region, department, sensitivity_level)
);

4.2 동적 행 레벨 보안

-- 사용자별 동적 데이터 필터링
CREATE OR REPLACE ROW ACCESS POLICY user_data_filter
ON `project.data.user_transactions`
GRANT TO ('group:analysts@company.com')
FILTER USING (
  -- 사용자는 자신의 데이터만 볼 수 있음
  user_id IN (
    SELECT allowed_user_id 
    FROM `project.security.user_data_access`
    WHERE analyst_email = SESSION_USER()
      AND access_granted = true
      AND CURRENT_DATE() BETWEEN start_date AND end_date
  )
);

-- 시간 기반 접근 제한
CREATE OR REPLACE ROW ACCESS POLICY time_based_filter  
ON `project.data.real_time_events`
GRANT TO ('group:business-users@company.com')
FILTER USING (
  -- 업무 시간에만 접근 가능
  EXTRACT(HOUR FROM CURRENT_DATETIME()) BETWEEN 8 AND 18
  AND EXTRACT(DAYOFWEEK FROM CURRENT_DATE()) BETWEEN 2 AND 6
);

4.3 행 레벨 보안 관리

-- 현재 적용된 행 접근 정책 확인
SELECT 
  row_access_policy_name,
  grantee_list,
  filter_predicate,
  creation_time,
  last_modified_time
FROM `project.data.INFORMATION_SCHEMA.ROW_ACCESS_POLICIES`
WHERE table_name = 'sensitive_table';

-- 정책 비활성화
DROP ROW ACCESS POLICY IF EXISTS regional_department_filter
ON `project.data.employee_records`;

-- 정책 수정
CREATE OR REPLACE ROW ACCESS POLICY updated_filter
ON `project.data.employee_records`
GRANT TO ('group:hr-team@company.com')
FILTER USING (
  department IN ('HR', 'Legal') OR 
  employee_level >= 'SENIOR'
);

5. 컬럼 레벨 보안 (CLS)

5.1 컬럼 레벨 보안 정책

-- 1. 민감한 데이터가 포함된 테이블 생성
CREATE OR REPLACE TABLE `project.hr.employee_data` (
  employee_id STRING,
  first_name STRING,
  last_name STRING,
  email STRING,
  ssn STRING,           -- 민감 정보
  salary NUMERIC,       -- 민감 정보  
  performance_rating STRING, -- 민감 정보
  department STRING,
  hire_date DATE,
  manager_id STRING
);

-- 2. 컬럼 접근 정책 생성
CREATE OR REPLACE COLUMN ACCESS POLICY pii_access_policy
ON `project.hr.employee_data`
FOR COLUMNS (ssn, salary, performance_rating)
GRANT TO ('group:hr-managers@company.com', 'group:payroll@company.com')
FILTER USING (
  -- 추가 조건: 본인 데이터이거나 관리자인 경우
  employee_id = (
    SELECT emp_id FROM `project.hr.user_mapping` 
    WHERE email = SESSION_USER()
  )
  OR 
  SESSION_USER() IN (
    SELECT manager_email FROM `project.hr.managers`
    WHERE department = (
      SELECT dept FROM `project.hr.employee_data` e 
      WHERE e.employee_id = employee_id
    )
  )
);

-- 3. 일반 사용자용 뷰 생성 (민감 정보 마스킹)
CREATE OR REPLACE VIEW `project.hr.employee_directory` AS
SELECT 
  employee_id,
  first_name,
  last_name,
  CASE 
    WHEN SESSION_USER() IN (
      SELECT email FROM `project.hr.authorized_users` 
      WHERE access_level >= 'MANAGER'
    )
    THEN email
    ELSE CONCAT(SUBSTR(email, 1, 3), '***@', 
         SUBSTR(email, STRPOS(email, '@') + 1, LENGTH(email)))
  END as email,
  '***-**-****' as ssn_masked,  -- SSN 마스킹
  CASE 
    WHEN SESSION_USER() IN (SELECT email FROM `project.hr.salary_viewers`)
    THEN CAST(salary AS STRING)
    ELSE 'CONFIDENTIAL'
  END as salary_info,
  department,
  hire_date
FROM `project.hr.employee_data`;

5.2 동적 데이터 마스킹

-- UDF를 활용한 동적 마스킹
CREATE OR REPLACE FUNCTION `project.security.mask_pii`(
  data STRING,
  data_type STRING,
  user_role STRING
)
RETURNS STRING
LANGUAGE SQL
AS (
  CASE 
    WHEN user_role = 'ADMIN' THEN data
    WHEN data_type = 'EMAIL' THEN 
      CONCAT(SUBSTR(data, 1, 2), '***@', SUBSTR(data, STRPOS(data, '@') + 1, LENGTH(data)))
    WHEN data_type = 'PHONE' THEN 
      CONCAT('***-***-', SUBSTR(data, -4, 4))
    WHEN data_type = 'SSN' THEN 
      '***-**-****'
    WHEN data_type = 'CREDIT_CARD' THEN 
      CONCAT('****-****-****-', SUBSTR(data, -4, 4))
    ELSE 'REDACTED'
  END
);

-- 마스킹 적용 뷰
CREATE OR REPLACE VIEW `project.public.customer_info` AS
SELECT 
  customer_id,
  customer_name,
  `project.security.mask_pii`(
    email, 'EMAIL', 
    (SELECT role FROM `project.security.user_roles` 
     WHERE user_email = SESSION_USER())
  ) as email,
  `project.security.mask_pii`(
    phone, 'PHONE',
    (SELECT role FROM `project.security.user_roles` 
     WHERE user_email = SESSION_USER())
  ) as phone,
  registration_date
FROM `project.raw.customers`;

5.3 조건부 컬럼 접근

-- 지역/부서별 컬럼 접근 제한
CREATE OR REPLACE VIEW `project.analytics.sales_data` AS
SELECT 
  order_id,
  product_id,
  quantity,
  -- 지역별 가격 정보 접근 제한
  CASE 
    WHEN SESSION_USER() IN (
      SELECT user_email FROM `project.security.regional_access`
      WHERE allowed_regions LIKE '%' || 
        (SELECT region FROM `project.raw.orders` o WHERE o.order_id = order_id) || '%'
    )
    THEN price
    ELSE NULL
  END as price,
  -- 부서별 고객 정보 접근 제한  
  CASE
    WHEN SESSION_USER() IN (
      SELECT user_email FROM `project.security.department_access`
      WHERE department IN ('Sales', 'Marketing', 'Customer Service')
    )
    THEN customer_id
    ELSE 'REDACTED'
  END as customer_id,
  order_date
FROM `project.raw.orders`;

6. 데이터 분류 및 태깅

6.1 자동 데이터 분류 설정

# Data Loss Prevention (DLP) API 활성화
gcloud services enable dlp.googleapis.com

# 데이터 분류 스캔 작업 생성
cat > dlp_bigquery_scan.json << EOF
{
  "parent": "projects/PROJECT_ID/locations/LOCATION",
  "inspectJob": {
    "storageConfig": {
      "bigQueryOptions": {
        "tableReference": {
          "projectId": "PROJECT_ID",
          "datasetId": "DATASET_ID",
          "tableId": "TABLE_ID"
        }
      }
    },
    "inspectConfig": {
      "infoTypes": [
        {"name": "EMAIL_ADDRESS"},
        {"name": "PHONE_NUMBER"}, 
        {"name": "CREDIT_CARD_NUMBER"},
        {"name": "US_SOCIAL_SECURITY_NUMBER"},
        {"name": "PERSON_NAME"}
      ],
      "minLikelihood": "LIKELY",
      "limits": {
        "maxFindingsPerRequest": 1000
      }
    }
  }
}
EOF

gcloud dlp jobs create PROJECT_ID dlp_bigquery_scan.json

6.2 수동 데이터 분류 및 태깅

-- 테이블 라벨을 사용한 분류
ALTER TABLE `project.dataset.table_name`
SET OPTIONS (
  labels = [
    ("data_classification", "confidential"),
    ("data_category", "pii"),
    ("retention_period", "7_years"),
    ("compliance", "gdpr"),
    ("owner", "data_team")
  ]
);

-- 컬럼 설명을 통한 분류 정보 제공
ALTER TABLE `project.dataset.customers`
ALTER COLUMN email SET OPTIONS (
  description = "Customer email address - PII:EMAIL - GDPR:YES - Retention:7years"
);

ALTER TABLE `project.dataset.customers`  
ALTER COLUMN ssn SET OPTIONS (
  description = "Social Security Number - PII:SSN - Classification:CONFIDENTIAL - Access:HR_ONLY"
);

-- 메타데이터 쿼리로 분류 정보 확인
SELECT 
  table_name,
  column_name,
  data_type,
  description,
  REGEXP_EXTRACT(description, r'PII:(\w+)') as pii_type,
  REGEXP_EXTRACT(description, r'Classification:(\w+)') as classification,
  REGEXP_EXTRACT(description, r'Access:(\w+)') as access_level
FROM `project.dataset.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'customers'
  AND description LIKE '%PII:%';

6.3 데이터 거버넌스 정책

-- 데이터 분류별 보존 정책
CREATE OR REPLACE PROCEDURE `project.governance.apply_retention_policy`()
BEGIN
  -- PII 데이터는 7년 후 자동 삭제
  DECLARE retention_query STRING;
  
  FOR record IN (
    SELECT table_schema, table_name
    FROM `project.INFORMATION_SCHEMA.TABLES` t
    JOIN `project.INFORMATION_SCHEMA.TABLE_OPTIONS` o
      ON t.table_schema = o.table_schema 
      AND t.table_name = o.table_name
    WHERE o.option_name = 'labels'
      AND o.option_value LIKE '%"data_category":"pii"%'
  )
  DO
    SET retention_query = FORMAT(
      "ALTER TABLE `%s.%s` SET OPTIONS (partition_expiration_days = %d)",
      record.table_schema, 
      record.table_name,
      365 * 7  -- 7년
    );
    EXECUTE IMMEDIATE retention_query;
  END FOR;
END;

7. 암호화 및 키 관리

7.1 고객 관리형 암호화 키 (CMEK)

# KMS 키 링 생성
gcloud kms keyrings create bigquery-keyring \
  --location=global

# 암호화 키 생성
gcloud kms keys create bigquery-key \
  --location=global \
  --keyring=bigquery-keyring \
  --purpose=encryption

# BigQuery 서비스 계정에 키 사용 권한 부여
PROJECT_NUMBER=$(gcloud projects describe PROJECT_ID --format="value(projectNumber)")
gcloud kms keys add-iam-policy-binding bigquery-key \
  --location=global \
  --keyring=bigquery-keyring \
  --member="serviceAccount:bq-$PROJECT_NUMBER@bigquery-encryption.iam.gserviceaccount.com" \
  --role="roles/cloudkms.cryptoKeyEncrypterDecrypter"
-- CMEK로 암호화된 데이터세트 생성
CREATE SCHEMA `project.encrypted_dataset`
OPTIONS (
  location = "US",
  default_kms_key_name = "projects/PROJECT_ID/locations/global/keyRings/bigquery-keyring/cryptoKeys/bigquery-key"
);

-- CMEK로 암호화된 테이블 생성
CREATE OR REPLACE TABLE `project.encrypted_dataset.sensitive_data` (
  id STRING,
  encrypted_data STRING,
  created_at TIMESTAMP
)
OPTIONS (
  kms_key_name = "projects/PROJECT_ID/locations/global/keyRings/bigquery-keyring/cryptoKeys/bigquery-key"
);

7.2 어플리케이션 레벨 암호화

-- UDF를 사용한 필드 레벨 암호화
CREATE OR REPLACE FUNCTION `project.security.encrypt_pii`(
  plaintext STRING,
  key_name STRING
)
RETURNS STRING
LANGUAGE SQL
AS (
  -- 실제 구현에서는 외부 KMS 서비스 호출
  -- 여기서는 예시를 위한 간단한 해싱
  TO_BASE64(SHA256(CONCAT(plaintext, key_name)))
);

CREATE OR REPLACE FUNCTION `project.security.decrypt_pii`(
  ciphertext STRING,
  key_name STRING,
  authorized_user STRING  
)
RETURNS STRING
LANGUAGE SQL  
AS (
  -- 권한 확인 후 복호화
  CASE 
    WHEN SESSION_USER() = authorized_user THEN 
      -- 실제 복호화 로직 (KMS 호출)
      CONCAT('DECRYPTED:', ciphertext)
    ELSE 'ACCESS_DENIED'
  END
);

-- 암호화된 데이터 삽입
INSERT INTO `project.secure.customers` (
  customer_id,
  encrypted_email,
  encrypted_phone
) VALUES (
  'CUST001',
  `project.security.encrypt_pii`('user@example.com', 'email_key'),
  `project.security.encrypt_pii`('555-1234', 'phone_key')
);

7.3 키 회전 및 관리

# 키 버전 생성 (회전)
gcloud kms keys versions create \
  --location=global \
  --keyring=bigquery-keyring \
  --key=bigquery-key

# 키 회전 자동화 스케줄 설정
gcloud kms keys update bigquery-key \
  --location=global \
  --keyring=bigquery-keyring \
  --rotation-period=90d \
  --next-rotation-time=2024-04-01T00:00:00Z

# 키 사용량 모니터링
gcloud logging read 'resource.type="bigquery_dataset" AND 
  protoPayload.methodName="google.cloud.bigquery.v2.TableService.GetTable" AND
  protoPayload.resourceName=~"encrypted_dataset"' \
  --limit=50 \
  --format="table(timestamp,protoPayload.authenticationInfo.principalEmail)"

8. 감사 로그 및 모니터링

8.1 BigQuery 감사 로그 설정

# audit_policy.yaml
auditConfigs:
- service: bigquery.googleapis.com
  auditLogConfigs:
  - logType: DATA_READ
    exemptedMembers:
    - serviceAccount:monitoring@project.iam.gserviceaccount.com
  - logType: DATA_WRITE
  - logType: ADMIN_READ
# 감사 정책 적용
gcloud projects set-iam-policy PROJECT_ID audit_policy.yaml

8.2 실시간 보안 모니터링

-- 의심스러운 활동 탐지 쿼리
CREATE OR REPLACE VIEW `project.security.suspicious_activity` AS
WITH query_stats AS (
  SELECT 
    protopayload_auditlog.authenticationInfo.principalEmail as user_email,
    protopayload_auditlog.resourceName as resource,
    protopayload_auditlog.methodName as method,
    protopayload_auditlog.requestMetadata.callerIp as ip_address,
    TIMESTAMP(receiveTimestamp) as event_time,
    -- JSON 추출을 통한 쿼리 정보
    JSON_EXTRACT_SCALAR(protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobConfiguration.query.query) as query_text
  FROM `project.logs.cloudaudit_googleapis_com_data_access`
  WHERE DATE(TIMESTAMP(receiveTimestamp)) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
    AND protopayload_auditlog.serviceName = 'bigquery.googleapis.com'
    AND protopayload_auditlog.methodName = 'jobservice.jobcompleted'
),
anomaly_detection AS (
  SELECT 
    user_email,
    resource,
    ip_address,
    event_time,
    query_text,
    -- 이상 징후 탐지
    CASE 
      WHEN query_text LIKE '%SELECT *%' 
           AND REGEXP_CONTAINS(resource, r'sensitive|pii|confidential') THEN 'BULK_PII_ACCESS'
      WHEN EXTRACT(HOUR FROM event_time) NOT BETWEEN 6 AND 22 THEN 'OFF_HOURS_ACCESS'
      WHEN REGEXP_CONTAINS(query_text, r'DROP|DELETE|TRUNCATE') 
           AND user_email NOT LIKE '%admin%' THEN 'UNAUTHORIZED_MODIFICATION'
      WHEN LENGTH(query_text) > 10000 THEN 'COMPLEX_QUERY'
    END as anomaly_type
  FROM query_stats
),
ip_analysis AS (
  SELECT 
    user_email,
    COUNT(DISTINCT ip_address) as unique_ips,
    COUNT(*) as total_queries,
    ARRAY_AGG(DISTINCT ip_address ORDER BY ip_address) as ip_list
  FROM query_stats
  WHERE DATE(event_time) = CURRENT_DATE()
  GROUP BY user_email
  HAVING COUNT(DISTINCT ip_address) > 3  -- 하루에 3개 이상 IP 사용
)
SELECT 
  a.user_email,
  a.event_time,
  a.anomaly_type,
  a.resource,
  a.ip_address,
  a.query_text,
  i.unique_ips,
  i.ip_list
FROM anomaly_detection a
LEFT JOIN ip_analysis i ON a.user_email = i.user_email
WHERE a.anomaly_type IS NOT NULL
ORDER BY a.event_time DESC;

8.3 알림 및 대응 자동화

-- 보안 이벤트 알림 프로시저
CREATE OR REPLACE PROCEDURE `project.security.security_alert_handler`(
  alert_type STRING,
  user_email STRING,
  resource_name STRING,
  severity STRING
)
BEGIN
  -- 보안 이벤트 로그 기록
  INSERT INTO `project.security.security_events` (
    event_time,
    alert_type,
    user_email,
    resource_name,
    severity,
    status
  ) VALUES (
    CURRENT_TIMESTAMP(),
    alert_type,
    user_email,
    resource_name,
    severity,
    'DETECTED'
  );
  
  -- 심각도별 자동 대응
  IF severity = 'CRITICAL' THEN
    -- 사용자 계정 임시 비활성화 (외부 API 호출 필요)
    CALL `project.security.disable_user_access`(user_email);
    
    -- 즉시 알림 발송
    CALL `project.notifications.send_immediate_alert`(
      CONCAT('CRITICAL SECURITY ALERT: ', alert_type),
      CONCAT('User: ', user_email, ' Resource: ', resource_name)
    );
    
  ELSEIF severity = 'HIGH' THEN  
    -- 추가 인증 요구
    CALL `project.security.require_additional_auth`(user_email);
    
  END IF;
END;

9. 개인정보 보호 (Privacy)

9.1 GDPR 준수를 위한 데이터 처리

-- 개인정보 식별 및 분류
CREATE OR REPLACE VIEW `project.privacy.gdpr_data_inventory` AS
SELECT 
  table_schema,
  table_name,
  column_name,
  data_type,
  -- PII 유형 분류
  CASE 
    WHEN column_name LIKE '%email%' THEN 'EMAIL'
    WHEN column_name LIKE '%phone%' THEN 'PHONE' 
    WHEN column_name LIKE '%address%' THEN 'ADDRESS'
    WHEN column_name LIKE '%name%' THEN 'NAME'
    WHEN column_name LIKE '%ssn%' THEN 'SSN'
    WHEN column_name LIKE '%birth%' THEN 'BIRTH_DATE'
  END as pii_type,
  -- GDPR 관련 처리 근거
  REGEXP_EXTRACT(description, r'GDPR_BASIS:(\w+)') as processing_basis,
  REGEXP_EXTRACT(description, r'RETENTION:(\w+)') as retention_period
FROM `project.INFORMATION_SCHEMA.COLUMNS`
WHERE table_schema NOT IN ('INFORMATION_SCHEMA', 'logs')
  AND (column_name LIKE '%email%' 
       OR column_name LIKE '%phone%'
       OR column_name LIKE '%address%'
       OR column_name LIKE '%name%'
       OR column_name LIKE '%ssn%'
       OR column_name LIKE '%birth%'
       OR description LIKE '%PII%');

9.2 데이터 주체 권리 지원

-- 개인정보 접근 권리 (Right to Access)
CREATE OR REPLACE PROCEDURE `project.privacy.export_user_data`(
  user_identifier STRING,
  request_id STRING
)
BEGIN
  DECLARE export_query STRING;
  DECLARE table_list ARRAY<STRING>;
  
  -- 개인정보가 포함된 모든 테이블 식별
  SET table_list = (
    SELECT ARRAY_AGG(CONCAT(table_schema, '.', table_name))
    FROM `project.privacy.gdpr_data_inventory`
    WHERE pii_type IS NOT NULL
  );
  
  -- 사용자 데이터 추출 및 JSON 형태로 저장
  FOR table_name IN UNNEST(table_list) DO
    SET export_query = FORMAT("""
      INSERT INTO `project.privacy.data_export_requests` (
        request_id,
        table_name,
        exported_data,
        export_timestamp
      )
      SELECT 
        '%s' as request_id,
        '%s' as table_name,
        TO_JSON_STRING(STRUCT(*)) as exported_data,
        CURRENT_TIMESTAMP() as export_timestamp
      FROM `%s`
      WHERE user_id = '%s' OR email = '%s'
    """, request_id, table_name, table_name, user_identifier, user_identifier);
    
    EXECUTE IMMEDIATE export_query;
  END FOR;
END;

-- 삭제 권리 (Right to Erasure)
CREATE OR REPLACE PROCEDURE `project.privacy.delete_user_data`(
  user_identifier STRING,
  deletion_reason STRING
)
BEGIN
  DECLARE deletion_log STRING;
  
  -- 삭제 로그 기록
  INSERT INTO `project.privacy.deletion_log` (
    deletion_timestamp,
    user_identifier,
    reason,
    status
  ) VALUES (
    CURRENT_TIMESTAMP(),
    user_identifier,
    deletion_reason,
    'INITIATED'
  );
  
  -- 개인정보 포함 테이블에서 데이터 삭제
  DELETE FROM `project.data.customers` 
  WHERE customer_id = user_identifier OR email = user_identifier;
  
  DELETE FROM `project.data.orders` 
  WHERE customer_id = user_identifier;
  
  DELETE FROM `project.data.user_events` 
  WHERE user_id = user_identifier;
  
  -- 익명화 처리 (완전 삭제가 어려운 경우)
  UPDATE `project.analytics.user_behavior`
  SET 
    user_id = CONCAT('ANONYMIZED_', GENERATE_UUID()),
    email = NULL,
    name = 'ANONYMIZED_USER'
  WHERE user_id = user_identifier;
  
  -- 완료 로그 업데이트
  UPDATE `project.privacy.deletion_log`
  SET status = 'COMPLETED'
  WHERE user_identifier = user_identifier
    AND status = 'INITIATED';
END;

9.3 데이터 익명화 및 가명처리

-- k-익명성을 보장하는 데이터 익명화
CREATE OR REPLACE FUNCTION `project.privacy.k_anonymize`(
  age INT64,
  zip_code STRING,
  k_value INT64
)
RETURNS STRUCT<age_group STRING, zip_prefix STRING>
LANGUAGE SQL
AS (
  -- 연령대 그룹화 (k=5 기준)
  STRUCT(
    CASE 
      WHEN age < 18 THEN 'UNDER_18'
      WHEN age BETWEEN 18 AND 25 THEN '18-25'
      WHEN age BETWEEN 26 AND 35 THEN '26-35'
      WHEN age BETWEEN 36 AND 45 THEN '36-45'
      WHEN age BETWEEN 46 AND 55 THEN '46-55'
      WHEN age BETWEEN 56 AND 65 THEN '56-65'
      ELSE 'OVER_65'
    END as age_group,
    -- 우편번호 앞 3자리만 사용
    SUBSTR(zip_code, 1, 3) as zip_prefix
  )
);

-- 차등 프라이버시 적용 함수
CREATE OR REPLACE FUNCTION `project.privacy.add_noise`(
  value FLOAT64,
  epsilon FLOAT64,
  sensitivity FLOAT64
)
RETURNS FLOAT64
LANGUAGE SQL
AS (
  -- 라플라스 분포를 근사한 노이즈 추가
  value + (sensitivity / epsilon) * 
  (RAND() - RAND())  -- 간단한 노이즈 (실제로는 더 정교한 구현 필요)
);

-- 익명화된 분석용 뷰
CREATE OR REPLACE VIEW `project.analytics.anonymized_user_stats` AS
SELECT 
  anonymized.age_group,
  anonymized.zip_prefix,
  COUNT(*) as user_count,
  `project.privacy.add_noise`(AVG(total_spent), 0.1, 1000) as avg_spent_noisy,
  `project.privacy.add_noise`(SUM(total_spent), 0.1, 1000) as total_spent_noisy
FROM (
  SELECT 
    user_id,
    `project.privacy.k_anonymize`(age, zip_code, 5) as anonymized,
    total_spent
  FROM `project.data.customers`
) 
GROUP BY anonymized.age_group, anonymized.zip_prefix
HAVING COUNT(*) >= 5;  -- k-익명성 보장

10. 보안 모범 사례

10.1 최소 권한 원칙

-- 역할별 최소 권한 매트릭스
CREATE OR REPLACE TABLE `project.security.role_permissions` (
  role_name STRING,
  resource_type STRING,
  permissions ARRAY<STRING>,
  restrictions ARRAY<STRING>
);

INSERT INTO `project.security.role_permissions` VALUES
('data_analyst', 'dataset', ['bigquery.tables.getData', 'bigquery.jobs.create'], ['no_export', 'view_only']),
('data_scientist', 'dataset', ['bigquery.tables.getData', 'bigquery.tables.create', 'bigquery.jobs.create'], ['temp_tables_only']),
('data_engineer', 'dataset', ['bigquery.tables.*', 'bigquery.datasets.create'], ['no_sensitive_data']),
('business_user', 'view', ['bigquery.tables.getData'], ['predefined_views_only']);

-- 권한 검증 함수
CREATE OR REPLACE FUNCTION `project.security.check_permission`(
  user_role STRING,
  resource_type STRING,
  requested_permission STRING
)
RETURNS BOOL
LANGUAGE SQL
AS (
  requested_permission IN (
    SELECT permission
    FROM `project.security.role_permissions`,
    UNNEST(permissions) as permission
    WHERE role_name = user_role 
      AND resource_type = resource_type
  )
);

10.2 정기 보안 감사

-- 정기 보안 감사 리포트
CREATE OR REPLACE PROCEDURE `project.security.generate_audit_report`()
BEGIN
  -- 1. 권한 분석
  CREATE OR REPLACE TABLE `project.security.audit_permissions` AS
  SELECT 
    member,
    role,
    resource,
    CURRENT_TIMESTAMP() as audit_timestamp
  FROM (
    -- BigQuery IAM 권한 추출 (실제로는 API 호출 필요)
    SELECT 'user:analyst@company.com' as member, 'roles/bigquery.dataViewer' as role, 'project:dataset' as resource
  );
  
  -- 2. 미사용 권한 식별
  CREATE OR REPLACE TABLE `project.security.unused_permissions` AS
  SELECT 
    p.member,
    p.role,
    p.resource,
    TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(l.event_time), DAY) as days_since_last_use
  FROM `project.security.audit_permissions` p
  LEFT JOIN `project.security.access_logs` l ON p.member = l.user_email
  GROUP BY p.member, p.role, p.resource
  HAVING days_since_last_use > 90 OR days_since_last_use IS NULL;
  
  -- 3. 권한 승급 검토
  CREATE OR REPLACE TABLE `project.security.privilege_escalation` AS
  SELECT 
    user_email,
    old_role,
    new_role,
    change_timestamp,
    approver
  FROM `project.security.role_changes`
  WHERE DATE(change_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
    AND new_role IN ('roles/bigquery.admin', 'roles/owner', 'roles/editor');
END;

10.3 보안 자동화

-- 자동 보안 정책 적용
CREATE OR REPLACE PROCEDURE `project.security.auto_security_enforcement`()
BEGIN
  -- 1. 새 테이블에 기본 보안 정책 적용
  FOR record IN (
    SELECT table_schema, table_name, creation_time
    FROM `project.INFORMATION_SCHEMA.TABLES`
    WHERE DATE(creation_time) = CURRENT_DATE()
      AND table_type = 'BASE TABLE'
      AND table_schema NOT LIKE '%temp%'
  )
  DO
    -- PII 가능성이 있는 컬럼 확인
    IF EXISTS (
      SELECT 1 FROM `project.INFORMATION_SCHEMA.COLUMNS`
      WHERE table_schema = record.table_schema 
        AND table_name = record.table_name
        AND (column_name LIKE '%email%' 
             OR column_name LIKE '%phone%'
             OR column_name LIKE '%ssn%')
    ) THEN
      -- 자동으로 제한적 접근 권한 설정
      EXECUTE IMMEDIATE FORMAT(
        "GRANT `roles/bigquery.dataViewer` ON TABLE `%s.%s` TO 'group:data-governance@company.com'",
        record.table_schema, record.table_name
      );
    END IF;
  END FOR;
  
  -- 2. 기본 행 레벨 보안 정책 적용
  -- 3. 데이터 분류 태그 자동 적용
  -- 4. 보존 정책 자동 설정
END;

-- 일일 보안 체크 스케줄링
CREATE OR REPLACE EVENT `project.security.daily_security_check`
ON SCHEDULE EVERY 1 DAY
DO
  CALL `project.security.auto_security_enforcement`();

10.4 사고 대응 절차

-- 보안 사고 대응 플레이북
CREATE OR REPLACE PROCEDURE `project.security.incident_response`(
  incident_type STRING,
  affected_resources ARRAY<STRING>,
  severity_level STRING
)
BEGIN
  DECLARE incident_id STRING DEFAULT GENERATE_UUID();
  
  -- 1. 사고 기록
  INSERT INTO `project.security.incidents` (
    incident_id,
    incident_type,
    affected_resources,
    severity_level,
    status,
    created_at
  ) VALUES (
    incident_id,
    incident_type,
    affected_resources,
    severity_level,
    'ACTIVE',
    CURRENT_TIMESTAMP()
  );
  
  -- 2. 자동 격리 조치
  IF severity_level = 'CRITICAL' THEN
    -- 영향받은 리소스에 대한 접근 차단
    FOR resource IN UNNEST(affected_resources) DO
      CALL `project.security.block_resource_access`(resource);
    END FOR;
  END IF;
  
  -- 3. 알림 발송
  CALL `project.notifications.send_incident_alert`(
    incident_id,
    incident_type,
    severity_level
  );
  
  -- 4. 포렌식 데이터 수집
  CALL `project.security.collect_forensic_data`(
    incident_id,
    affected_resources
  );
END;

BigQuery의 다층적 보안 기능을 적절히 활용하면 데이터의 기밀성, 무결성, 가용성을 보장하면서도 필요한 사용자들이 효율적으로 데이터를 활용할 수 있는 환경을 구축할 수 있습니다. 지속적인 보안 모니터링과 정책 업데이트를 통해 진화하는 보안 위협에 대응하는 것이 중요합니다.