262 lines
14 KiB
Python
262 lines
14 KiB
Python
import requests
|
||
from django.shortcuts import get_object_or_404
|
||
import logging
|
||
from djangoProject1 import settings
|
||
from .models import Category, Product
|
||
from cart.forms import CartAddProductForm
|
||
from jose import jwt
|
||
from django.contrib import messages
|
||
import jwt
|
||
from django.http import JsonResponse
|
||
|
||
|
||
|
||
|
||
def product_detail(request, id, slug):
|
||
product = get_object_or_404(Product,
|
||
id=id,
|
||
slug=slug,
|
||
available=True)
|
||
cart_product_form = CartAddProductForm()
|
||
|
||
return render(request,
|
||
'main/product/detail.html',
|
||
{'product': product,
|
||
'cart_product_form': cart_product_form})
|
||
|
||
|
||
# Функция для страницы "О нас"
|
||
def about(request):
|
||
return render(request, 'main/about.html')
|
||
|
||
|
||
#Получение access token пользователя
|
||
def exchange_code_for_token(authorization_code, redirect_uri):
|
||
token_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/token'
|
||
client_id = 'admin-cli'
|
||
client_secret = 'wOVphEiLVBS1AlNKRpaQpD4yQh5Wm3TJ'
|
||
|
||
payload = {
|
||
'grant_type': 'authorization_code',
|
||
'code': authorization_code,
|
||
'client_id': client_id,
|
||
'client_secret': client_secret,
|
||
'redirect_uri': redirect_uri,
|
||
}
|
||
|
||
response = requests.post(token_endpoint, data=payload)
|
||
print(response.status_code)
|
||
if response.status_code == 200:
|
||
tokens = response.json()
|
||
access_token = tokens.get('access_token')
|
||
return access_token
|
||
else:
|
||
print("Failed to exchange code: HTTP status", response.status_code)
|
||
print("Response body:", response.text)
|
||
raise Exception('Failed to exchange authorization code for tokens')
|
||
|
||
#Через access_token получать данные о пользователе декодировав его в json
|
||
def get_user_info(access_token):
|
||
userinfo_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/userinfo'
|
||
headers = {'Authorization': f'Bearer {access_token}'}
|
||
response = requests.get(userinfo_endpoint, headers=headers)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print("Failed to fetch user info: HTTP status", response.status_code)
|
||
return None
|
||
|
||
|
||
def decode_access_token(access_token):
|
||
|
||
payload = jwt.decode(access_token, options={"verify_signature": False})
|
||
return payload
|
||
|
||
|
||
def callback(request, category_slug=None):
|
||
context = {}
|
||
# Получение кода авторизации из запроса.
|
||
# где клиент получает код после аутентификации пользователя на сервере.
|
||
authorization_code = request.GET.get('code')
|
||
if authorization_code:
|
||
try:
|
||
# Обмен кода на access_token с помощью функции exchange_code_for_token.
|
||
# Этот токен необходим для доступа к защищенным ресурсам (например, к информации о пользователе)
|
||
# на сервере аутентификации.
|
||
access_token = exchange_code_for_token(authorization_code, settings.KEYCLOAK_CONFIG['CALLBACK_URL'])
|
||
# Декодирование access_token для получения информации о пользователе.
|
||
# Это может включать имя пользователя, email и другие данные, в зависимости от настроек сервера.
|
||
user_info = decode_access_token(access_token)
|
||
# Сохранение полученной информации о пользователе в сессии.
|
||
# Это позволяет использовать данные пользователя в дальнейшем, не обращаясь к серверу аутентификации.
|
||
request.session['user_info'] = user_info
|
||
|
||
# Добавление информации о пользователе в контекст для отображения на странице.
|
||
context['userinfo'] = request.session.get('user_info', {})
|
||
except Exception as e:
|
||
# В случае ошибки (например, проблемы с обменом кода или декодированием токена)
|
||
# добавляем информацию об ошибке в контекст.
|
||
context['error'] = str(e)
|
||
else:
|
||
# Если код авторизации не найден в запросе, сообщаем об этом через контекст.
|
||
context['error'] = "Authorization code not found."
|
||
|
||
# Загрузка всех категорий и продуктов для отображения на странице.
|
||
# Если указан category_slug, фильтруем продукты по данной категории.
|
||
category = None
|
||
categories = Category.objects.all()
|
||
products = Product.objects.filter(available=True)
|
||
if category_slug:
|
||
category = get_object_or_404(Category, slug=category_slug)
|
||
products = products.filter(category=category)
|
||
|
||
# Рендеринг страницы с продуктами, передавая информацию о категориях, продуктах и контекст с данными пользователя или ошибками.
|
||
return render(request, 'main/product/list.html',
|
||
{'category': category, 'categories': categories, 'products': products})
|
||
|
||
|
||
from django.shortcuts import render, redirect
|
||
|
||
def profile(request):
|
||
# Проверка метода запроса: если POST, значит пользователь отправил форму для обновления своих данных.
|
||
if request.method == 'POST':
|
||
# Получение текущей информации о пользователе из сессии.
|
||
user_info = request.session.get('user_info', {})
|
||
|
||
# Обновление информации о пользователе данными из формы.
|
||
# Если какое-то поле не было предоставлено, используется текущее значение из сессии.
|
||
user_info['given_name'] = request.POST.get('first_name', user_info.get('given_name', ''))
|
||
user_info['family_name'] = request.POST.get('last_name', user_info.get('family_name', ''))
|
||
user_info['preferred_username'] = request.POST.get('username', user_info.get('preferred_username', ''))
|
||
user_info['email'] = request.POST.get('email', user_info.get('email', ''))
|
||
|
||
# Сохранение обновленной информации о пользователе обратно в сессию.
|
||
request.session['user_info'] = user_info
|
||
|
||
# После обновления информации редиректим пользователя на страницу callback,
|
||
# которая, скорее всего, отображает информацию о профиле или подтверждение об обновлении данных.
|
||
return render(request, 'main/callback.html')
|
||
else:
|
||
# Если метод запроса не POST, пользователь просто посещает страницу профиля,
|
||
# мы загружаем текущую информацию о пользователе из сессии и передаем ее в контекст шаблона.
|
||
context = {'userinfo': request.session.get('user_info', {})}
|
||
|
||
# Отображаем страницу профиля пользователя, передавая информацию о пользователе в контекст.
|
||
return render(request, 'main/callback.html', context)
|
||
|
||
|
||
|
||
# логирование
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def get_keycloak_admin_token():
|
||
# Формируем данные для запроса токена
|
||
data = {
|
||
'client_id': settings.KEYCLOAK_CONFIG['CLIENT_ID'], # ID клиента
|
||
'client_secret': settings.KEYCLOAK_CONFIG['CLIENT_SECRET'], # Секрет клиента
|
||
'grant_type': 'client_credentials', # Тип авторизации
|
||
}
|
||
|
||
# Отправка запроса для получения токена администратора Keycloak
|
||
response = requests.post(
|
||
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/realms/{settings.KEYCLOAK_CONFIG['REALM']}/protocol/openid-connect/token",
|
||
data=data
|
||
)
|
||
# Возвращаем токен доступа из ответа
|
||
return response.json()['access_token']
|
||
|
||
|
||
def get_user_sessions(admin_token, user_id):
|
||
# Установка заголовков с токеном администратора для авторизации запроса
|
||
headers = {'Authorization': f'Bearer {admin_token}'}
|
||
# Запрос на получение списка сессий пользователя в Keycloak
|
||
response = requests.get(
|
||
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/users/{user_id}/sessions",
|
||
headers=headers
|
||
)
|
||
# Проверка успешности запроса и возвращение результата
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
logger.error(f"Error fetching user sessions: {response.status_code}")
|
||
return []
|
||
|
||
def logout_user(admin_token, session_id):
|
||
# Установка заголовков с токеном администратора для авторизации запроса
|
||
headers = {'Authorization': f'Bearer {admin_token}'}
|
||
# Отправка запроса на удаление сессии пользователя по ID сессии
|
||
response = requests.delete(
|
||
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/sessions/{session_id}",
|
||
headers=headers
|
||
)
|
||
# Проверка успешности запроса на удаление
|
||
return response.status_code == 204
|
||
|
||
def get_user_id_from_session(request):
|
||
# Получение информации о пользователе из сессии
|
||
user_info = request.session.get('user_info', {})
|
||
user_id = user_info.get('sub') # 'sub' обычно используется как идентификатор пользователя
|
||
# Проверка валидности формата ID пользователя
|
||
if isinstance(user_id, str):
|
||
return user_id
|
||
else:
|
||
logger.error(f"Invalid user ID format in session: {user_id}")
|
||
return None
|
||
|
||
|
||
|
||
def admin_logout_user(request, category_slug=None):
|
||
# Получение ID пользователя из сессии текущего запроса.
|
||
user_id = get_user_id_from_session(request)
|
||
|
||
# Получение административного токена для обращения к Keycloak API.
|
||
admin_token = get_keycloak_admin_token()
|
||
if not admin_token:
|
||
# Логгирование ошибки, если не удалось получить токен администратора.
|
||
logger.error("Can't obtain admin token.")
|
||
# Отображение сообщения об ошибке пользователю.
|
||
messages.error(request, "Can't obtain admin token for logout.")
|
||
# Перенаправление на страницу выхода/авторизации в случае ошибки.
|
||
return redirect(settings.KEYCLOAK_CONFIG['LOGOUT_REDIRECT_URL'])
|
||
|
||
# Получение списка активных сессий пользователя через Keycloak API.
|
||
sessions = get_user_sessions(admin_token, user_id)
|
||
if sessions is None or not sessions:
|
||
# Логгирование, если сессии не найдены.
|
||
logger.error("Can't find user sessions.")
|
||
# Уведомление пользователя о невозможности найти активные сессии.
|
||
messages.error(request, "No active sessions found for user.")
|
||
# Перенаправление на callback URL.
|
||
return redirect(settings.KEYCLOAK_CONFIG['CALLBACK_URL'])
|
||
|
||
# Флаг, указывающий на успешный выход из всех сессий.
|
||
all_sessions_logged_out = True
|
||
for session in sessions:
|
||
# Попытка выхода из каждой сессии через Keycloak API.
|
||
if not logout_user(admin_token, session['id']):
|
||
# Если выход из какой-либо сессии не удался, флаг устанавливается в False.
|
||
all_sessions_logged_out = False
|
||
logger.error(f"Failed to log out session {session['id']}")
|
||
|
||
if all_sessions_logged_out:
|
||
# Логгирование успешного выхода из всех сессий.
|
||
logger.info("User successfully logged out from all Keycloak sessions.")
|
||
# Уведомление пользователя о успешном выходе.
|
||
messages.success(request, "Successfully logged out.")
|
||
else:
|
||
# Уведомление пользователя, если выход был выполнен не из всех сессий.
|
||
messages.warning(request, "Partial logout. Some sessions may still be active.")
|
||
|
||
# Очистка сессии Django после выхода пользователя.
|
||
request.session.flush()
|
||
# Перенаправление пользователя на callback URL после выхода.
|
||
return redirect(settings.KEYCLOAK_CONFIG['CALLBACK_URL'])
|
||
|
||
|
||
|
||
|
||
def check_user_authenticated(request):
|
||
user_id = get_user_id_from_session(request)
|
||
is_authenticated = user_id is not None
|
||
return JsonResponse({'isAuthenticated': is_authenticated})
|