중복 추천을 해결하기 위해 Redis 기반의 장소 캐싱 기능을 서비스 클래스로 설계하였고, 다른 팀원들도 쉽게 활용할 수 있도록 프로젝트 내에서 재사용할 수 있도록 공유했다.
이번 글에서는 Redis를 활용한 장소 추천 데이터 캐싱 및 중복 추천 방지 방법을 코드와 함께 자세히 설명하려고 한다.
SpotRedisService: Redis 기반 장소 관리 서비스
SpotRedisService 클래스는 Redis를 활용하여 사용자의 추천 장소 데이터를 관리하는 기능을 제공한다.
이 클래스는 장소 데이터를 저장, 조회, 삭제하는 기능을 제공하며, Redis의 Hash 자료구조를 활용하여 효율적으로 데이터를 관리한다.
먼저 완성된 코드는 다음과 같다. 콘솔 로그에 출력되는 정보가 많아 가독성을 높이기 위해 logger에 이모티콘을 추가했다.
spot_redis.py
from typing import List, Optional
from enum import Enum
import logging
import json
from redis.asyncio import Redis
from datetime import timedelta
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class SpotCategory(str, Enum):
CAFE = "cafe"
RESTAURANT = "restaurant"
SITE = "site"
ACCOMMODATION = "accommodation"
ALL = "all" # 초안 에이전트
class SpotRedisService:
"""Redis를 활용한 범용 장소 관리 서비스"""
EXPIRATION_HOURS = 5 / 60 # 5분
def __init__(self, redis_client: Redis):
"""
생성자에서 Redis 인스턴스를 주입받습니다.
:param redis: 의존성 주입된 Redis 인스턴스
"""
self.redis = redis_client
def _generate_key(self, member_id: str, main_location: str) -> str:
"""Redis 키 생성
Args:
member_id: 사용자 ID
main_location: 지역 정보
"""
return f"{member_id}:{main_location}"
async def add_spots(
self,
member_id: str,
category: SpotCategory,
main_location: str,
spots: List[str],
) -> None:
"""새로운 장소들을 Hash의 field에 Set으로 추가"""
try:
# 전달 받은 데이터 확인
logger.info(f"🟣 Received spots data: {spots}")
logger.info(f"🟣 Location: {main_location}")
logger.info(f"🟣 Category: {category}")
logger.info(f"🟣 Member ID: {member_id}")
key = self._generate_key(member_id, main_location)
# Redis 연결 확인
await self.redis.ping()
logger.info("Redis connection successful")
# 현재 field(category)에 저장된 데이터 가져오기
current_data = await self.redis.hget(key, category.value)
current_spots = set(json.loads(current_data)) if current_data else set()
# 새로운 spots 추가 (자동 중복 제거)
updated_spots = current_spots.union(set(spots))
# Hash에 저장 (field는 category)
await self.redis.hset(key, category.value, json.dumps(list(updated_spots)))
# 만료 시간 설정
await self.redis.expire(key, timedelta(hours=self.EXPIRATION_HOURS).seconds)
# redis 저장 완료 확인
logger.info(
f"🟣 Redis에 저장된 장소 수: {len(updated_spots)}, "
f"Key: {key}, Field: {category.value}"
)
logger.info(f"🟣 Redis에 저장된 장소들: {updated_spots}")
except Exception as e:
logger.error(
f"Redis 저장 중 오류 발생 - Key: {key}, Field: {category.value}, Error: {e}"
)
raise
async def get_spots(
self, member_id: str, category: SpotCategory, main_location: str
) -> List[str]:
"""Redis Hash에서 특정 field의 장소 목록 조회"""
try:
key = self._generate_key(member_id, main_location)
# Hash에서 특정 field(category) 데이터 조회
data = await self.redis.hget(key, category.value)
spots_list = json.loads(data) if data else []
# 조회된 데이터 확인
logger.info(
f"🟣 Redis에서 조회된 장소 수: {len(spots_list)}, "
f"Key: {key}, Field: {category.value}"
)
return spots_list
except Exception as e:
logger.error(
f"Redis 조회 중 오류 발생 - Key: {key}, Field: {category.value}, Error: {e}"
)
return []
async def clear_spots(
self, member_id: str, category: SpotCategory, main_location: str
) -> None:
"""특정 카테고리의 특정 field 삭제"""
try:
key = self._generate_key(member_id, main_location)
# Hash에서 특정 field(category) 삭제
await self.redis.hdel(key, category.value)
logger.info(
f"🟣 Redis 데이터 삭제 완료 - Key: {key}, Field: {category.value}"
)
except Exception as e:
logger.error(
f"Redis 데이터 삭제 중 오류 발생 - Key: {key}, Field: {category.value}, "
f"Error: {e}"
)
raise
# 사용 예시:
"""
redis_service = SpotRedisService(redis_client)
# 식당 추가
await redis_service.add_spots(
member_id="user123",
main_location="부산광역시-해운대구",
category=SpotCategory.RESTAURANT,
spots=["식당1", "식당2"]
)
# 식당 조회
restaurants = await redis_service.get_spots(
member_id="user123",
main_location="부산광역시-해운대구,"
category=SpotCategory.RESTAURANT,
)
# 식당 목록 삭제
await redis_service.clear_spots(
member_id="user123",
main_location="부산광역시-해운대구,"
category=SpotCategory.RESTAURANT
)
"""
주요 기능
- 장소 데이터 저장 (add_spots): 중복되지 않도록 기존 데이터와 합쳐서 저장
- 장소 데이터 조회 (get_spots): 사용자가 기존에 추천받은 장소 목록 조회
- 장소 데이터 삭제 (clear_spots): 특정 장소 데이터를 Redis에서 삭제
코드 설명
1. SpotRedisService 클래스 및 기본 설정
class SpotRedisService:
"""Redis를 활용한 범용 장소 관리 서비스"""
EXPIRATION_HOURS = 5 / 60 # 5분
def __init__(self, redis_client: Redis):
"""
생성자에서 Redis 인스턴스를 주입받습니다.
:param redis_client: 의존성 주입된 Redis 인스턴스
"""
self.redis = redis_client
- SpotRedisService는 Redis를 활용한 장소 데이터 관리 서비스 클래스
- EXPIRATION_HOURS → Redis 캐시의 만료 시간을 5분(5/60시간)으로 설정
- 생성자에서 redis_client를 받아 의존성 주입(DI, Dependency Injection) 방식으로 사용
2. _generate_key: Redis 키 생성 함수
def _generate_key(self, member_id: str, main_location: str) -> str:
"""Redis 키 생성
Args:
member_id: 사용자 ID
main_location: 지역 정보
"""
return f"{member_id}:{main_location}"
- Redis에서 데이터를 저장할 때 고유한 키를 생성하는 함수
- 키 구조 → "member_id:main_location" (예: "user123:부산광역시-해운대구")
- 이유 → 사용자와 지역별로 장소 추천을 구분하여 저장할 수 있도록 함
3. add_spots: 중복 없는 장소 데이터 저장
사용자가 새 장소 추천을 받을 때, 기존에 추천된 장소를 유지하면서 새로운 장소만 추가하여 저장한다.
→ 동일한 장소가 여러 번 추천되지 않도록 관리한다.
async def add_spots(
self,
member_id: str,
category: SpotCategory,
main_location: str,
spots: List[str],
) -> None:
"""새로운 장소들을 Hash의 field에 Set으로 추가"""
try:
# 전달 받은 데이터 확인
logger.info(f"🟣 Received spots data: {spots}")
logger.info(f"🟣 Location: {main_location}")
logger.info(f"🟣 Category: {category}")
logger.info(f"🟣 Member ID: {member_id}")
key = self._generate_key(member_id, main_location)
# Redis 연결 확인
await self.redis.ping()
logger.info("Redis connection successful")
# 현재 저장된 장소 데이터 가져오기
current_data = await self.redis.hget(key, category.value)
current_spots = set(json.loads(current_data)) if current_data else set()
# 새로운 spots 추가 (자동 중복 제거)
updated_spots = current_spots.union(set(spots))
# Hash에 저장 (field는 category)
await self.redis.hset(key, category.value, json.dumps(list(updated_spots)))
# 만료 시간 설정
await self.redis.expire(key, timedelta(hours=self.EXPIRATION_HOURS).seconds)
# redis 저장 완료 확인
logger.info(
f"🟣 Redis에 저장된 장소 수: {len(updated_spots)}, "
f"Key: {key}, Field: {category.value}"
)
logger.info(f"🟣 Redis에 저장된 장소들: {updated_spots}")
except Exception as e:
logger.error(
f"Redis 저장 중 오류 발생 - Key: {key}, Field: {category.value}, Error: {e}"
)
raise
- member_id, category, main_location을 기준으로 장소 데이터를 Redis에 저장한다.
- hget을 사용해 현재 저장된 장소 목록을 가져와 중복을 방지하고 새로운 데이터를 추가한다.
- hset을 통해 장소 데이터를 JSON 형태로 저장한다.
- expire를 사용해 5분 후 데이터가 자동 삭제되도록 설정.
- 중복 방지 핵심 로직: current_spots.union(set(spots))
저장된 결과
4. get_spots: 저장된 장소 목록 조회
기존에 추천된 장소를 확인하고, 새로운 추천을 생성할 때 중복을 방지하는 데 사용할 수 있다.
async def get_spots(
self, member_id: str, category: SpotCategory, main_location: str
) -> List[str]:
"""Redis Hash에서 특정 field의 장소 목록 조회"""
try:
key = self._generate_key(member_id, main_location)
# Hash에서 특정 field(category) 데이터 조회
data = await self.redis.hget(key, category.value)
spots_list = json.loads(data) if data else []
# 조회된 데이터 확인
logger.info(
f"🟣 Redis에서 조회된 장소 수: {len(spots_list)}, "
f"Key: {key}, Field: {category.value}"
)
return spots_list
except Exception as e:
logger.error(
f"Redis 조회 중 오류 발생 - Key: {key}, Field: {category.value}, Error: {e}"
)
return []
- hget을 사용하여 특정 사용자(member_id)와 지역(main_location)에 해당하는 장소 목록을 조회한다.
- Redis에 저장된 데이터가 없으면 빈 리스트를 반환한다.
5. clear_spots: 특정 장소 데이터 삭제
일정이 변경되거나 새로운 추천을 시작할 때 기존 데이터를 삭제하는 데 사용될 수 있다.
async def clear_spots(
self, member_id: str, category: SpotCategory, main_location: str
) -> None:
"""특정 카테고리의 특정 field 삭제"""
try:
key = self._generate_key(member_id, main_location)
# Hash에서 특정 field(category) 삭제
await self.redis.hdel(key, category.value)
logger.info(
f"🟣 Redis 데이터 삭제 완료 - Key: {key}, Field: {category.value}"
)
except Exception as e:
logger.error(
f"Redis 데이터 삭제 중 오류 발생 - Key: {key}, Field: {category.value}, "
f"Error: {e}"
)
raise
'Python' 카테고리의 다른 글
[Python] 1:1 문의 답변 이메일 전송하기(smtplib + Gmail) (0) | 2025.02.22 |
---|---|
[Python] Redis를 활용한 중복 추천 방지 로직 상세 분석 (0) | 2025.02.20 |
[Python] print와 logging, 디버깅할 때 무엇을 쓸까? (0) | 2025.02.17 |
[Python] 동기 vs 비동기, 언제 그리고 어떻게 사용해야 할까? (0) | 2025.02.07 |
[Python] FastAPI와 React 연동 및 데이터 흐름과 처리 과정 (0) | 2025.01.31 |