빅쿼리 스트리밍
실시간 데이터 수집
BigQuery에서 실시간 데이터 스트리밍 처리 방법을 다루는 종합 가이드입니다.
목차
1. BigQuery 스트리밍 개요
1.1 스트리밍이란?
BigQuery 스트리밍은 데이터를 실시간으로 BigQuery에 삽입하여 즉시 분석 가능하게 하는 기능입니다.
1.2 주요 특징
- 실시간 삽입: 초 단위 지연시간
- 자동 스키마 감지: 새로운 필드 자동 추가
- 높은 처리량: 초당 수백만 레코드 처리
- 중복 제거: insertId를 통한 중복 방지
2. 스트리밍 인서트
2.1 기본 스트리밍 인서트
from google.cloud import bigquery
# 클라이언트 초기화
client = bigquery.Client()
table_id = "project.dataset.streaming_table"
# 스트리밍 데이터
rows_to_insert = [
{
"user_id": "12345",
"event_type": "page_view",
"timestamp": "2024-01-01 10:00:00",
"page_url": "/home",
"session_id": "session_123"
},
{
"user_id": "67890",
"event_type": "click",
"timestamp": "2024-01-01 10:00:01",
"button_id": "signup_btn",
"session_id": "session_456"
}
]
# 스트리밍 삽입
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Errors occurred: {errors}")
else:
print("Data streamed successfully")
2.2 중복 제거 활용
import uuid
from datetime import datetime
def stream_with_deduplication(client, table_id, event_data):
"""중복 제거를 위한 insertId 사용"""
# 고유 ID 생성 (타임스탬프 + 해시)
insert_id = str(uuid.uuid4())
row = {
"insertId": insert_id, # 중복 제거용 ID
"json": {
"event_id": insert_id,
"user_id": event_data["user_id"],
"event_type": event_data["event_type"],
"timestamp": datetime.utcnow().isoformat(),
"properties": event_data.get("properties", {})
}
}
errors = client.insert_rows_json(
table_id,
[row["json"]],
row_ids=[row["insertId"]]
)
return errors
2.3 스키마 자동 적응
def adaptive_streaming(client, table_id, data):
"""스키마 변화에 적응하는 스트리밍"""
# 스키마 자동 감지 활성화
job_config = bigquery.LoadJobConfig(
autodetect=True,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
)
# 데이터 스트리밍
errors = client.insert_rows_json(
table_id,
data,
ignore_unknown_values=False # 새 필드 허용
)
return errors
3. Dataflow와 연동
3.1 Apache Beam 파이프라인
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run_streaming_pipeline():
"""실시간 데이터 처리 파이프라인"""
options = PipelineOptions([
'--project=your-project',
'--runner=DataflowRunner',
'--streaming',
'--region=us-central1'
])
with beam.Pipeline(options=options) as p:
(p
# Pub/Sub에서 메시지 읽기
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/project/subscriptions/events-sub'
)
# JSON 파싱
| 'Parse JSON' >> beam.Map(parse_json_message)
# 데이터 변환
| 'Transform Data' >> beam.Map(transform_event_data)
# 집계 (윈도우 적용)
| 'Window' >> beam.WindowInto(
beam.window.FixedWindows(60) # 1분 윈도우
)
| 'Group by Key' >> beam.GroupByKey()
| 'Aggregate' >> beam.Map(calculate_metrics)
# BigQuery에 스트리밍
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table='project:dataset.streaming_metrics',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS'
))
def parse_json_message(message):
"""JSON 메시지 파싱"""
import json
return json.loads(message.decode('utf-8'))
def transform_event_data(event):
"""이벤트 데이터 변환"""
return {
'user_id': event['user_id'],
'event_type': event['event_type'],
'timestamp': event['timestamp'],
'processed_at': beam.window.TimestampedValue.now()
}
3.2 실시간 집계 파이프라인
def streaming_aggregation_pipeline():
"""실시간 집계 파이프라인"""
with beam.Pipeline(options=options) as p:
events = (p
| 'Read Events' >> beam.io.ReadFromPubSub(
subscription='events-subscription'
)
| 'Parse Events' >> beam.Map(parse_event)
)
# 사용자별 세션 메트릭
user_metrics = (events
| 'Extract User Events' >> beam.Map(
lambda x: (x['user_id'], x)
)
| 'Window by Session' >> beam.WindowInto(
beam.window.Sessions(gap=beam.window.Duration(minutes=30))
)
| 'Group by User' >> beam.GroupByKey()
| 'Calculate User Metrics' >> beam.Map(calculate_user_session)
| 'Write User Metrics' >> beam.io.WriteToBigQuery(
'project:dataset.user_sessions'
)
)
# 실시간 대시보드 메트릭
realtime_metrics = (events
| 'Window for Realtime' >> beam.WindowInto(
beam.window.FixedWindows(60) # 1분
)
| 'Count Events' >> beam.combiners.Count.Globally()
| 'Format Metrics' >> beam.Map(format_realtime_metrics)
| 'Write Realtime' >> beam.io.WriteToBigQuery(
'project:dataset.realtime_dashboard'
)
)
4. Pub/Sub 통합
4.1 Pub/Sub 스트리밍
-- Pub/Sub 구독에서 직접 스트리밍 테이블 생성
CREATE OR REPLACE EXTERNAL TABLE `project.dataset.pubsub_stream`
OPTIONS (
format = 'CLOUD_PUBSUB',
uris = ['projects/project/subscriptions/event-stream']
);
-- 실시간 쿼리
SELECT
JSON_VALUE(data, '$.user_id') as user_id,
JSON_VALUE(data, '$.event_type') as event_type,
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S',
JSON_VALUE(data, '$.timestamp')) as event_time
FROM `project.dataset.pubsub_stream`
WHERE TIMESTAMP_DIFF(
CURRENT_TIMESTAMP(),
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S',
JSON_VALUE(data, '$.timestamp')),
SECOND
) <= 300; -- 최근 5분 데이터만
4.2 스트리밍 변환
def pubsub_to_bigquery_streaming():
"""Pub/Sub에서 BigQuery로 실시간 스트리밍"""
from google.cloud import pubsub_v1, bigquery
import json
import threading
# 클라이언트 초기화
subscriber = pubsub_v1.SubscriberClient()
bq_client = bigquery.Client()
subscription_path = subscriber.subscription_path(
'project-id', 'subscription-name'
)
table_id = 'project.dataset.events'
def callback(message):
"""메시지 처리 콜백"""
try:
# JSON 파싱
event_data = json.loads(message.data.decode('utf-8'))
# 데이터 변환
transformed_data = {
'event_id': event_data.get('id'),
'user_id': event_data.get('user_id'),
'event_type': event_data.get('type'),
'timestamp': event_data.get('timestamp'),
'properties': json.dumps(event_data.get('properties', {})),
'processed_at': datetime.utcnow().isoformat()
}
# BigQuery 스트리밍 인서트
errors = bq_client.insert_rows_json(table_id, [transformed_data])
if not errors:
message.ack()
print(f"Successfully processed message: {message.message_id}")
else:
print(f"Errors inserting to BigQuery: {errors}")
message.nack()
except Exception as e:
print(f"Error processing message: {e}")
message.nack()
# 스트리밍 시작
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=pubsub_v1.types.FlowControl(max_messages=1000)
)
print(f"Listening for messages on {subscription_path}")
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
5. 실시간 분석
5.1 스트리밍 분석 쿼리
-- 실시간 사용자 활동 분석
WITH realtime_events AS (
SELECT
user_id,
event_type,
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp) as event_time,
JSON_EXTRACT_SCALAR(properties, '$.page_url') as page_url
FROM `project.dataset.streaming_events`
WHERE TIMESTAMP_DIFF(
CURRENT_TIMESTAMP(),
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp),
MINUTE
) <= 5 -- 최근 5분
),
user_sessions AS (
SELECT
user_id,
COUNT(DISTINCT
CONCAT(user_id, '_',
FORMAT_TIMESTAMP('%Y%m%d%H%M',
TIMESTAMP_TRUNC(event_time, MINUTE, "UTC")
)
)
) as active_sessions,
COUNT(*) as total_events,
COUNT(DISTINCT event_type) as event_types,
MIN(event_time) as first_event,
MAX(event_time) as last_event
FROM realtime_events
GROUP BY user_id
)
SELECT
COUNT(DISTINCT user_id) as active_users,
SUM(total_events) as total_events,
AVG(total_events) as avg_events_per_user,
COUNT(CASE WHEN total_events >= 10 THEN 1 END) as highly_active_users
FROM user_sessions;
5.2 실시간 이상 감지
-- 실시간 트래픽 이상 감지
WITH minute_metrics AS (
SELECT
TIMESTAMP_TRUNC(
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp),
MINUTE
) as minute,
COUNT(*) as events_count,
COUNT(DISTINCT user_id) as unique_users,
COUNT(DISTINCT session_id) as unique_sessions
FROM `project.dataset.streaming_events`
WHERE TIMESTAMP_DIFF(
CURRENT_TIMESTAMP(),
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp),
MINUTE
) <= 60 -- 최근 1시간
GROUP BY minute
),
baseline_stats AS (
SELECT
AVG(events_count) as avg_events,
STDDEV(events_count) as stddev_events,
AVG(unique_users) as avg_users,
STDDEV(unique_users) as stddev_users
FROM minute_metrics
WHERE minute <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 10 MINUTE)
)
SELECT
mm.minute,
mm.events_count,
mm.unique_users,
bs.avg_events,
bs.avg_users,
-- 이상 감지
CASE
WHEN mm.events_count > bs.avg_events + 2 * bs.stddev_events THEN 'HIGH_TRAFFIC'
WHEN mm.events_count < bs.avg_events - 2 * bs.stddev_events THEN 'LOW_TRAFFIC'
ELSE 'NORMAL'
END as traffic_anomaly,
CASE
WHEN mm.unique_users > bs.avg_users + 2 * bs.stddev_users THEN 'USER_SPIKE'
WHEN mm.unique_users < bs.avg_users - 2 * bs.stddev_users THEN 'USER_DROP'
ELSE 'NORMAL'
END as user_anomaly
FROM minute_metrics mm
CROSS JOIN baseline_stats bs
WHERE mm.minute >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 10 MINUTE)
ORDER BY mm.minute DESC;
6. 성능 최적화
6.1 배치 크기 최적화
class OptimizedStreamer:
def __init__(self, client, table_id, batch_size=1000, batch_timeout=5):
self.client = client
self.table_id = table_id
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.batch = []
self.last_flush = time.time()
def add_event(self, event):
"""이벤트 추가 (배치 처리)"""
self.batch.append(event)
# 배치 크기 또는 시간 초과 시 플러시
if (len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.batch_timeout):
self.flush_batch()
def flush_batch(self):
"""배치 데이터 전송"""
if not self.batch:
return
errors = self.client.insert_rows_json(
self.table_id,
self.batch
)
if errors:
print(f"Batch insert errors: {errors}")
else:
print(f"Successfully inserted {len(self.batch)} rows")
self.batch = []
self.last_flush = time.time()
6.2 테이블 파티셔닝
-- 스트리밍에 최적화된 파티션 테이블
CREATE OR REPLACE TABLE `project.dataset.streaming_events` (
event_id STRING,
user_id STRING,
event_type STRING,
timestamp TIMESTAMP,
properties JSON,
_inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
)
PARTITION BY DATE(_inserted_at)
CLUSTER BY user_id, event_type
OPTIONS (
partition_expiration_days = 365,
require_partition_filter = false -- 스트리밍에서 false 권장
);
6.3 스트리밍 버퍼 최적화
def optimized_streaming_buffer():
"""메모리 기반 버퍼로 성능 최적화"""
import queue
import threading
from collections import defaultdict
class StreamingBuffer:
def __init__(self, client, table_id):
self.client = client
self.table_id = table_id
self.buffer = queue.Queue(maxsize=10000)
self.running = True
# 백그라운드 처리 스레드 시작
self.worker_thread = threading.Thread(
target=self._process_buffer
)
self.worker_thread.start()
def add_event(self, event):
"""이벤트를 버퍼에 추가"""
try:
self.buffer.put(event, timeout=1)
except queue.Full:
print("Buffer full, dropping event")
def _process_buffer(self):
"""백그라운드에서 버퍼 처리"""
batch = []
while self.running:
try:
# 배치 수집 (최대 1초 대기)
while len(batch) < 1000:
try:
event = self.buffer.get(timeout=1)
batch.append(event)
self.buffer.task_done()
except queue.Empty:
break
# 배치 전송
if batch:
errors = self.client.insert_rows_json(
self.table_id, batch
)
if errors:
print(f"Insert errors: {errors}")
batch = []
except Exception as e:
print(f"Buffer processing error: {e}")
def close(self):
"""버퍼 종료"""
self.running = False
self.worker_thread.join()
return StreamingBuffer
7. 실제 활용 사례
7.1 실시간 웹 분석
-- 실시간 웹사이트 트래픽 대시보드
CREATE OR REPLACE VIEW `project.analytics.realtime_dashboard` AS
WITH recent_events AS (
SELECT *
FROM `project.dataset.streaming_events`
WHERE TIMESTAMP_DIFF(
CURRENT_TIMESTAMP(),
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp),
MINUTE
) <= 30
),
page_views AS (
SELECT
JSON_EXTRACT_SCALAR(properties, '$.page_url') as page_url,
COUNT(*) as views,
COUNT(DISTINCT user_id) as unique_visitors
FROM recent_events
WHERE event_type = 'page_view'
GROUP BY page_url
),
traffic_sources AS (
SELECT
JSON_EXTRACT_SCALAR(properties, '$.referrer') as traffic_source,
COUNT(DISTINCT user_id) as users,
COUNT(*) as sessions
FROM recent_events
WHERE event_type = 'session_start'
GROUP BY traffic_source
),
conversion_funnel AS (
SELECT
'page_view' as step,
1 as step_order,
COUNT(DISTINCT user_id) as users
FROM recent_events
WHERE event_type = 'page_view'
UNION ALL
SELECT
'add_to_cart' as step,
2 as step_order,
COUNT(DISTINCT user_id) as users
FROM recent_events
WHERE event_type = 'add_to_cart'
UNION ALL
SELECT
'purchase' as step,
3 as step_order,
COUNT(DISTINCT user_id) as users
FROM recent_events
WHERE event_type = 'purchase'
)
SELECT
CURRENT_TIMESTAMP() as dashboard_timestamp,
-- 트래픽 메트릭
(SELECT COUNT(*) FROM recent_events) as total_events,
(SELECT COUNT(DISTINCT user_id) FROM recent_events) as active_users,
(SELECT COUNT(DISTINCT session_id) FROM recent_events) as active_sessions,
-- 상위 페이지
ARRAY(
SELECT STRUCT(page_url, views, unique_visitors)
FROM page_views
ORDER BY views DESC
LIMIT 10
) as top_pages,
-- 트래픽 소스
ARRAY(
SELECT STRUCT(traffic_source, users)
FROM traffic_sources
ORDER BY users DESC
LIMIT 5
) as traffic_sources,
-- 전환 깔때기
ARRAY(
SELECT STRUCT(step, users)
FROM conversion_funnel
ORDER BY step_order
) as conversion_funnel;
7.2 IoT 센서 데이터 실시간 처리
def iot_streaming_pipeline():
"""IoT 센서 데이터 실시간 처리"""
def process_sensor_data(element):
"""센서 데이터 처리 및 이상 감지"""
import json
data = json.loads(element)
# 센서 값 정규화
normalized_value = normalize_sensor_value(
data['sensor_type'],
data['value']
)
# 이상값 감지
is_anomaly = detect_anomaly(
data['sensor_id'],
normalized_value
)
return {
'sensor_id': data['sensor_id'],
'sensor_type': data['sensor_type'],
'location': data['location'],
'timestamp': data['timestamp'],
'raw_value': data['value'],
'normalized_value': normalized_value,
'is_anomaly': is_anomaly,
'processed_at': beam.window.TimestampedValue.now()
}
with beam.Pipeline(options=pipeline_options) as p:
# 센서 데이터 스트리밍
sensor_stream = (p
| 'Read Sensor Data' >> beam.io.ReadFromPubSub(
subscription='projects/project/subscriptions/sensors'
)
| 'Process Data' >> beam.Map(process_sensor_data)
)
# 정상 데이터는 집계 테이블로
normal_data = (sensor_stream
| 'Filter Normal' >> beam.Filter(lambda x: not x['is_anomaly'])
| 'Window 5min' >> beam.WindowInto(
beam.window.FixedWindows(300) # 5분
)
| 'Aggregate by Location' >> beam.GroupBy('location')
| 'Calculate Stats' >> beam.Map(calculate_sensor_stats)
| 'Write Aggregated' >> beam.io.WriteToBigQuery(
'project:iot.sensor_aggregates'
)
)
# 이상 데이터는 알림 테이블로
anomaly_data = (sensor_stream
| 'Filter Anomalies' >> beam.Filter(lambda x: x['is_anomaly'])
| 'Write Anomalies' >> beam.io.WriteToBigQuery(
'project:iot.sensor_anomalies',
method='STREAMING_INSERTS'
)
)
7.3 실시간 추천 시스템
-- 실시간 사용자 행동 기반 추천
CREATE OR REPLACE PROCEDURE `project.ml.update_realtime_recommendations`()
BEGIN
-- 최근 활동 분석
CREATE OR REPLACE TEMP TABLE user_recent_activity AS
SELECT
user_id,
ARRAY_AGG(
JSON_EXTRACT_SCALAR(properties, '$.product_id')
ORDER BY PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp) DESC
LIMIT 10
) as recent_products,
ARRAY_AGG(DISTINCT
JSON_EXTRACT_SCALAR(properties, '$.category')
) as interested_categories,
COUNT(*) as activity_score
FROM `project.dataset.streaming_events`
WHERE event_type IN ('view', 'click', 'add_to_cart')
AND TIMESTAMP_DIFF(
CURRENT_TIMESTAMP(),
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp),
HOUR
) <= 2 -- 최근 2시간
GROUP BY user_id
HAVING COUNT(*) >= 3; -- 최소 3번 활동
-- 실시간 추천 업데이트
MERGE `project.recommendations.realtime_user_recs` target
USING (
SELECT
ura.user_id,
ARRAY(
SELECT STRUCT(
p.product_id,
p.product_name,
p.category,
-- 실시간 점수 계산
(0.5 * p.popularity_score +
0.3 * CASE WHEN p.category IN UNNEST(ura.interested_categories) THEN 100 ELSE 0 END +
0.2 * ura.activity_score) as recommendation_score
)
FROM `project.master.products` p
WHERE p.status = 'active'
AND p.product_id NOT IN UNNEST(ura.recent_products) -- 최근 본 상품 제외
AND (p.category IN UNNEST(ura.interested_categories)
OR p.popularity_score > 80)
ORDER BY recommendation_score DESC
LIMIT 20
) as recommendations,
CURRENT_TIMESTAMP() as updated_at
FROM user_recent_activity ura
) source
ON target.user_id = source.user_id
WHEN MATCHED THEN UPDATE SET
recommendations = source.recommendations,
updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT (
user_id, recommendations, updated_at
) VALUES (
source.user_id, source.recommendations, source.updated_at
);
END;
BigQuery 스트리밍을 활용하면 실시간으로 대용량 데이터를 처리하고 즉시 분석할 수 있습니다. 적절한 파티셔닝, 배치 처리, 버퍼링을 통해 성능을 최적화하고 안정적인 실시간 데이터 파이프라인을 구축할 수 있습니다.