API 활용
BigQuery API 프로그래밍
BigQuery의 다양한 API를 활용한 프로그래밍 방식의 데이터 처리 방법을 다루는 가이드입니다.
목차
1. BigQuery API 개요
1.1 지원하는 API 유형
- REST API: HTTP 기반 직접 호출
- 클라이언트 라이브러리: Python, Java, Node.js, Go 등
- gRPC API: 고성능 스트리밍
- Storage Write API: 고성능 스트리밍 삽입
1.2 인증 설정
from google.cloud import bigquery
from google.oauth2 import service_account
# 서비스 계정 키 파일 사용
credentials = service_account.Credentials.from_service_account_file(
'path/to/service-account-key.json'
)
client = bigquery.Client(credentials=credentials, project='your-project-id')
# 기본 인증 (gcloud auth application-default login 사용)
client = bigquery.Client()
2. Python 클라이언트 라이브러리
2.1 기본 쿼리 실행
from google.cloud import bigquery
import pandas as pd
def run_query(sql_query):
"""기본 쿼리 실행"""
client = bigquery.Client()
# 쿼리 실행
query_job = client.query(sql_query)
# 결과 가져오기
results = query_job.result()
# DataFrame으로 변환
df = results.to_dataframe()
return df
# 사용 예시
sql = """
SELECT
customer_id,
SUM(order_amount) as total_amount
FROM `project.dataset.orders`
WHERE order_date >= '2024-01-01'
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 10
"""
top_customers = run_query(sql)
print(top_customers)
2.2 테이블 생성 및 데이터 로드
def create_table_from_dataframe(df, table_id, schema=None):
"""DataFrame에서 테이블 생성"""
client = bigquery.Client()
# 작업 구성
job_config = bigquery.LoadJobConfig()
if schema:
job_config.schema = schema
else:
job_config.autodetect = True
# 기존 테이블 덮어쓰기
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# 데이터 로드
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
job.result() # 작업 완료까지 대기
print(f"Loaded {job.output_rows} rows into {table_id}")
# 사용 예시
import pandas as pd
sample_data = pd.DataFrame({
'customer_id': [1, 2, 3],
'customer_name': ['Alice', 'Bob', 'Charlie'],
'signup_date': ['2024-01-01', '2024-01-02', '2024-01-03']
})
create_table_from_dataframe(
sample_data,
'project.dataset.customers'
)
2.3 배치 작업 관리
def run_batch_queries(queries):
"""여러 쿼리를 배치로 실행"""
client = bigquery.Client()
jobs = []
# 모든 쿼리 시작
for i, query in enumerate(queries):
job_config = bigquery.QueryJobConfig(
job_id=f"batch_job_{i}_{int(time.time())}"
)
job = client.query(query, job_config=job_config)
jobs.append(job)
print(f"Started job: {job.job_id}")
# 모든 작업 완료 대기
results = []
for job in jobs:
result = job.result()
results.append({
'job_id': job.job_id,
'state': job.state,
'total_bytes_processed': job.total_bytes_processed,
'rows': [dict(row) for row in result]
})
return results
# 사용 예시
batch_queries = [
"SELECT COUNT(*) as total_orders FROM `project.dataset.orders`",
"SELECT COUNT(DISTINCT customer_id) as unique_customers FROM `project.dataset.orders`",
"SELECT AVG(order_amount) as avg_order_amount FROM `project.dataset.orders`"
]
batch_results = run_batch_queries(batch_queries)
for result in batch_results:
print(f"Job {result['job_id']}: {result['rows']}")
2.4 동적 쿼리 생성
class DynamicQueryBuilder:
"""동적 쿼리 빌더 클래스"""
def __init__(self, base_table):
self.base_table = base_table
self.select_fields = []
self.where_conditions = []
self.group_by_fields = []
self.order_by_fields = []
self.limit_count = None
def select(self, *fields):
self.select_fields.extend(fields)
return self
def where(self, condition):
self.where_conditions.append(condition)
return self
def group_by(self, *fields):
self.group_by_fields.extend(fields)
return self
def order_by(self, field, direction='ASC'):
self.order_by_fields.append(f"{field} {direction}")
return self
def limit(self, count):
self.limit_count = count
return self
def build(self):
"""쿼리 문자열 생성"""
query_parts = []
# SELECT
select_clause = "SELECT " + ", ".join(self.select_fields or ["*"])
query_parts.append(select_clause)
# FROM
query_parts.append(f"FROM `{self.base_table}`")
# WHERE
if self.where_conditions:
where_clause = "WHERE " + " AND ".join(self.where_conditions)
query_parts.append(where_clause)
# GROUP BY
if self.group_by_fields:
group_clause = "GROUP BY " + ", ".join(self.group_by_fields)
query_parts.append(group_clause)
# ORDER BY
if self.order_by_fields:
order_clause = "ORDER BY " + ", ".join(self.order_by_fields)
query_parts.append(order_clause)
# LIMIT
if self.limit_count:
query_parts.append(f"LIMIT {self.limit_count}")
return "\n".join(query_parts)
def execute(self):
"""쿼리 실행"""
client = bigquery.Client()
query = self.build()
print(f"Executing query:\n{query}")
return client.query(query).result().to_dataframe()
# 사용 예시
builder = DynamicQueryBuilder('project.dataset.orders')
result = (builder
.select('customer_id', 'SUM(order_amount) as total_spent')
.where('order_date >= "2024-01-01"')
.where('order_amount > 100')
.group_by('customer_id')
.order_by('total_spent', 'DESC')
.limit(10)
.execute()
)
3. JavaScript/Node.js 활용
3.1 기본 설정
// npm install @google-cloud/bigquery
const { BigQuery } = require('@google-cloud/bigquery');
// 클라이언트 초기화
const bigquery = new BigQuery({
projectId: 'your-project-id',
keyFilename: 'path/to/service-account-key.json'
});
async function runQuery(sqlQuery) {
const options = {
query: sqlQuery,
location: 'US',
};
try {
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
const [rows] = await job.getQueryResults();
return rows;
} catch (error) {
console.error('Query failed:', error);
throw error;
}
}
// 사용 예시
async function getTopCustomers() {
const query = `
SELECT
customer_id,
SUM(order_amount) as total_amount
FROM \`project.dataset.orders\`
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 5
`;
const rows = await runQuery(query);
console.log('Top customers:');
rows.forEach(row => {
console.log(`Customer ${row.customer_id}: $${row.total_amount}`);
});
}
3.2 스트리밍 삽입
async function streamData(tableId, data) {
const dataset = bigquery.dataset('your-dataset');
const table = dataset.table(tableId);
try {
await table.insert(data);
console.log(`Inserted ${data.length} rows`);
} catch (error) {
if (error.name === 'PartialFailureError') {
console.log('Some rows failed to insert:');
error.errors.forEach(err => console.log(err));
} else {
console.error('Insert failed:', error);
}
}
}
// 사용 예시
const eventData = [
{
user_id: '12345',
event_type: 'page_view',
timestamp: new Date().toISOString(),
properties: { page: '/home' }
},
{
user_id: '67890',
event_type: 'click',
timestamp: new Date().toISOString(),
properties: { button: 'signup' }
}
];
streamData('events', eventData);
4. REST API 직접 호출
4.1 HTTP 요청 예시
import requests
import json
from google.auth.transport.requests import Request
from google.oauth2 import service_account
def get_access_token():
"""액세스 토큰 획득"""
credentials = service_account.Credentials.from_service_account_file(
'path/to/service-account-key.json',
scopes=['https://www.googleapis.com/auth/bigquery']
)
credentials.refresh(Request())
return credentials.token
def run_query_rest_api(project_id, sql_query):
"""REST API로 쿼리 실행"""
token = get_access_token()
url = f'https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/queries'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
payload = {
'query': sql_query,
'useLegacySql': False,
'maxResults': 1000
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Query failed: {response.text}")
# 사용 예시
result = run_query_rest_api(
'your-project-id',
'SELECT COUNT(*) as total FROM `project.dataset.table`'
)
print(json.dumps(result, indent=2))
4.2 테이블 메타데이터 조회
def get_table_metadata(project_id, dataset_id, table_id):
"""테이블 메타데이터 조회"""
token = get_access_token()
url = f'https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}'
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
metadata = response.json()
return {
'table_id': metadata.get('id'),
'description': metadata.get('description'),
'num_rows': metadata.get('numRows'),
'num_bytes': metadata.get('numBytes'),
'creation_time': metadata.get('creationTime'),
'last_modified': metadata.get('lastModifiedTime'),
'schema_fields': len(metadata.get('schema', {}).get('fields', []))
}
else:
raise Exception(f"Failed to get metadata: {response.text}")
# 사용 예시
metadata = get_table_metadata('project', 'dataset', 'table')
print(f"Table has {metadata['num_rows']} rows and {metadata['schema_fields']} columns")
5. 스트리밍 API
5.1 Storage Write API 활용
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
import json
def stream_data_storage_api(project_id, dataset_id, table_id, data):
"""Storage Write API를 사용한 고성능 스트리밍"""
client = bigquery_storage_v1.BigQueryWriteClient()
table_path = client.table_path(project_id, dataset_id, table_id)
# 스트림 생성
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.COMMITTED
write_stream = client.create_write_stream(
parent=table_path,
write_stream=write_stream
)
# 스트리밍 writer 생성
stream_writer = writer.AppendRowsStream(
client, write_stream.name
)
try:
# 데이터 전송
for batch in data:
# JSON 직렬화
serialized_rows = []
for row in batch:
serialized_rows.append(json.dumps(row).encode('utf-8'))
# 요청 생성
append_request = types.AppendRowsRequest()
append_request.write_stream = write_stream.name
json_rows = types.AppendRowsRequest.ProtoData()
json_rows.rows.serialized_rows = serialized_rows
append_request.proto_rows = json_rows
# 전송
response = stream_writer.send(append_request)
print(f"Streamed batch, offset: {response.append_result.offset}")
finally:
# 스트림 종료
stream_writer.close()
# 스트림 완료
client.finalize_write_stream(name=write_stream.name)
# 사용 예시
sample_data = [
[
{'user_id': '123', 'action': 'login', 'timestamp': '2024-01-01T10:00:00'},
{'user_id': '456', 'action': 'logout', 'timestamp': '2024-01-01T11:00:00'}
]
]
stream_data_storage_api('project', 'dataset', 'events', sample_data)
6. 실제 활용 사례
6.1 실시간 ETL 파이프라인
class RealTimeETLPipeline:
"""실시간 ETL 파이프라인"""
def __init__(self, project_id):
self.project_id = project_id
self.client = bigquery.Client(project=project_id)
def extract_from_source(self, source_table, last_sync_time):
"""소스에서 데이터 추출"""
query = f"""
SELECT *
FROM `{source_table}`
WHERE updated_at > TIMESTAMP('{last_sync_time}')
ORDER BY updated_at
"""
return self.client.query(query).result().to_dataframe()
def transform_data(self, df):
"""데이터 변환"""
# 데이터 정제
df = df.dropna(subset=['customer_id'])
# 새 컬럼 추가
df['full_name'] = df['first_name'] + ' ' + df['last_name']
df['processed_at'] = pd.Timestamp.now()
# 데이터 타입 변환
df['customer_id'] = df['customer_id'].astype(str)
return df
def load_to_target(self, df, target_table):
"""타겟 테이블로 로드"""
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
job = self.client.load_table_from_dataframe(
df, target_table, job_config=job_config
)
job.result()
return job.output_rows
def run_etl(self, source_table, target_table, last_sync_time):
"""ETL 실행"""
try:
print(f"Starting ETL: {source_table} -> {target_table}")
# Extract
df = self.extract_from_source(source_table, last_sync_time)
print(f"Extracted {len(df)} rows")
if len(df) == 0:
print("No new data to process")
return
# Transform
df_transformed = self.transform_data(df)
print(f"Transformed data: {df_transformed.shape}")
# Load
rows_loaded = self.load_to_target(df_transformed, target_table)
print(f"Loaded {rows_loaded} rows to {target_table}")
return df_transformed['updated_at'].max()
except Exception as e:
print(f"ETL failed: {e}")
raise
# 사용 예시
pipeline = RealTimeETLPipeline('your-project')
last_sync = pipeline.run_etl(
'project.raw.customers',
'project.processed.customers',
'2024-01-01 00:00:00'
)
6.2 데이터 품질 모니터링
class DataQualityMonitor:
"""데이터 품질 모니터링 시스템"""
def __init__(self, project_id):
self.client = bigquery.Client(project=project_id)
def check_data_quality(self, table_id):
"""데이터 품질 검사"""
checks = {
'total_rows': self._count_rows(table_id),
'null_percentage': self._calculate_null_percentage(table_id),
'duplicate_percentage': self._calculate_duplicate_percentage(table_id),
'data_freshness_hours': self._calculate_data_freshness(table_id)
}
# 품질 점수 계산
quality_score = self._calculate_quality_score(checks)
checks['quality_score'] = quality_score
# 결과 저장
self._save_quality_results(table_id, checks)
return checks
def _count_rows(self, table_id):
query = f"SELECT COUNT(*) as count FROM `{table_id}`"
result = list(self.client.query(query))[0]
return result.count
def _calculate_null_percentage(self, table_id):
# 모든 컬럼의 NULL 비율 계산
schema = self.client.get_table(table_id).schema
null_checks = []
for field in schema:
null_checks.append(f"COUNTIF({field.name} IS NULL)")
query = f"""
SELECT
({' + '.join(null_checks)}) as total_nulls,
COUNT(*) * {len(schema)} as total_cells
FROM `{table_id}`
"""
result = list(self.client.query(query))[0]
return (result.total_nulls / result.total_cells * 100) if result.total_cells > 0 else 0
def _calculate_quality_score(self, checks):
# 간단한 점수 계산 로직
score = 100
score -= min(checks['null_percentage'], 50) # NULL 비율만큼 차감
score -= min(checks['duplicate_percentage'], 30) # 중복 비율만큼 차감
if checks['data_freshness_hours'] > 24:
score -= 20 # 24시간 이상 오래된 데이터
return max(score, 0)
def _save_quality_results(self, table_id, results):
# 결과를 품질 모니터링 테이블에 저장
quality_data = [{
'table_id': table_id,
'check_timestamp': pd.Timestamp.now().isoformat(),
**results
}]
df = pd.DataFrame(quality_data)
self.client.load_table_from_dataframe(
df, 'project.monitoring.data_quality_results'
).result()
# 사용 예시
monitor = DataQualityMonitor('your-project')
quality_results = monitor.check_data_quality('project.data.customers')
print(f"Quality Score: {quality_results['quality_score']}")
6.3 자동화된 보고서 생성
class AutomatedReporting:
"""자동화된 보고서 생성 시스템"""
def __init__(self, project_id):
self.client = bigquery.Client(project=project_id)
def generate_sales_report(self, start_date, end_date):
"""매출 보고서 생성"""
queries = {
'daily_sales': f"""
SELECT
DATE(order_timestamp) as date,
SUM(amount) as daily_sales,
COUNT(*) as order_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM `project.sales.orders`
WHERE DATE(order_timestamp) BETWEEN '{start_date}' AND '{end_date}'
GROUP BY date
ORDER BY date
""",
'product_performance': f"""
SELECT
product_name,
SUM(quantity) as units_sold,
SUM(amount) as revenue,
AVG(amount / quantity) as avg_unit_price
FROM `project.sales.orders` o
JOIN `project.catalog.products` p ON o.product_id = p.product_id
WHERE DATE(order_timestamp) BETWEEN '{start_date}' AND '{end_date}'
GROUP BY product_name
ORDER BY revenue DESC
LIMIT 20
""",
'customer_segments': f"""
WITH customer_stats AS (
SELECT
customer_id,
SUM(amount) as total_spent,
COUNT(*) as order_count
FROM `project.sales.orders`
WHERE DATE(order_timestamp) BETWEEN '{start_date}' AND '{end_date}'
GROUP BY customer_id
)
SELECT
CASE
WHEN total_spent >= 1000 THEN 'VIP'
WHEN total_spent >= 500 THEN 'Premium'
WHEN total_spent >= 100 THEN 'Regular'
ELSE 'Basic'
END as segment,
COUNT(*) as customer_count,
SUM(total_spent) as segment_revenue,
AVG(total_spent) as avg_customer_value
FROM customer_stats
GROUP BY segment
ORDER BY segment_revenue DESC
"""
}
report_data = {}
for report_name, query in queries.items():
df = self.client.query(query).result().to_dataframe()
report_data[report_name] = df
# 보고서 생성
report = self._create_report(report_data, start_date, end_date)
# 보고서 저장
self._save_report(report, f"sales_report_{start_date}_{end_date}")
return report
def _create_report(self, data, start_date, end_date):
"""HTML 보고서 생성"""
html = f"""
<html>
<head>
<title>Sales Report ({start_date} - {end_date})</title>
<style>
body
table
th, td
th
</style>
</head>
<body>
<h1>Sales Report</h1>
<p>Period: {start_date} to {end_date}</p>
<h2>Daily Sales Summary</h2>
{data['daily_sales'].to_html(index=False)}
<h2>Top Products</h2>
{data['product_performance'].to_html(index=False)}
<h2>Customer Segments</h2>
{data['customer_segments'].to_html(index=False)}
</body>
</html>
"""
return html
def _save_report(self, html_content, filename):
"""보고서를 Cloud Storage에 저장"""
# Cloud Storage 업로드 로직
with open(f'/tmp/{filename}.html', 'w') as f:
f.write(html_content)
print(f"Report saved: {filename}.html")
# 사용 예시
reporter = AutomatedReporting('your-project')
report = reporter.generate_sales_report('2024-01-01', '2024-01-31')
BigQuery API를 활용하면 프로그래밍 방식으로 복잡한 데이터 처리 워크플로우를 구축하고 자동화할 수 있습니다. 각 언어별 클라이언트 라이브러리와 REST API를 적절히 조합하여 효율적인 데이터 파이프라인을 만들어보세요.