Harmony/main/views.py

204 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
from django.shortcuts import render,get_object_or_404
from django.http import HttpResponse, HttpResponseRedirect
import logging
from djangoProject1 import settings
from .models import Category, Product
from cart.forms import CartAddProductForm
from jose import jwt
from django.contrib import messages
YOUR_REDIRECT_URI = 'http://127.0.0.1:8000/callback/'
def home(request):
# Ваш код для отображения главной страницы
return render(request, 'main/base.html')
def product_detail(request, id, slug):
product = get_object_or_404(Product,
id=id,
slug=slug,
available=True)
cart_product_form = CartAddProductForm()
return render(request,
'main/product/detail.html',
{'product': product,
'cart_product_form': cart_product_form})
# Функция для страницы "О нас"
def about(request):
return render(request, 'main/about.html')
def exchange_code_for_token(authorization_code, redirect_uri):
token_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/token'
client_id = 'admin-cli'
client_secret = 'wOVphEiLVBS1AlNKRpaQpD4yQh5Wm3TJ'
payload = {
'grant_type': 'authorization_code',
'code': authorization_code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': redirect_uri,
}
response = requests.post(token_endpoint, data=payload)
print(response.status_code)
if response.status_code == 200:
tokens = response.json()
access_token = tokens.get('access_token')
return access_token
else:
print("Failed to exchange code: HTTP status", response.status_code)
print("Response body:", response.text)
raise Exception('Failed to exchange authorization code for tokens')
def get_user_info(access_token):
userinfo_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/userinfo'
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(userinfo_endpoint, headers=headers)
if response.status_code == 200:
return response.json()
else:
print("Failed to fetch user info: HTTP status", response.status_code)
return None
import jwt
def decode_access_token(access_token):
payload = jwt.decode(access_token, options={"verify_signature": False})
return payload
def callback(request,category_slug=None):
#Получение кода авторизации из запроса
context = {}
authorization_code = request.GET.get('code')
if authorization_code:
try:
access_token = exchange_code_for_token(authorization_code, YOUR_REDIRECT_URI)
user_info = decode_access_token(access_token)
request.session['user_info'] = user_info
context['userinfo'] = request.session.get('user_info', {})
except Exception as e:
context['error'] = str(e)
else:
context['error'] = "Authorization code not found."
category = None
categories = Category.objects.all()
products = Product.objects.filter(available=True)
if category_slug:
category = get_object_or_404(Category,
slug=category_slug)
products = products.filter(category=category)
return render(request, 'main/product/list.html',{'category': category,
'categories': categories,
'products': products})
from django.shortcuts import render, redirect
def profile(request):
if request.method == 'POST':
user_info = request.session.get('user_info', {})
user_info['given_name'] = request.POST.get('first_name', user_info.get('given_name', ''))
user_info['family_name'] = request.POST.get('last_name', user_info.get('family_name', ''))
user_info['preferred_username'] = request.POST.get('username', user_info.get('preferred_username', ''))
user_info['email'] = request.POST.get('email', user_info.get('email', ''))
request.session['user_info'] = user_info
return render(request, 'main/callback.html')
else:
context = {'userinfo': request.session.get('user_info', {})}
return render(request, 'main/callback.html', context)
# Настройте логирование
logger = logging.getLogger(__name__)
def get_keycloak_admin_token():
data = {
'client_id': settings.KEYCLOAK_CONFIG['CLIENT_ID'],
'client_secret': settings.KEYCLOAK_CONFIG['CLIENT_SECRET'],
'grant_type': 'client_credentials',
}
response = requests.post(
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/realms/{settings.KEYCLOAK_CONFIG['REALM']}/protocol/openid-connect/token",
data=data
)
return response.json()['access_token']
def get_user_sessions(admin_token, user_id):
headers = {'Authorization': f'Bearer {admin_token}'}
response = requests.get(
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/users/{user_id}/sessions",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error fetching user sessions: {response.status_code}")
return []
def logout_user(admin_token,session_id):
headers = {
'Authorization': f'Bearer {admin_token}',
}
response = requests.delete(
f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/sessions/{session_id}",
headers=headers
)
return response.status_code == 204
def get_user_id_from_session(request):
user_info = request.session.get('user_info', {})
user_id = user_info.get('sub')
if isinstance(user_id, str):
return user_id
else:
logger.error(f"Invalid user ID format in session: {user_id}")
return None
def admin_logout_user(request):
user_id = get_user_id_from_session(request)
admin_token = get_keycloak_admin_token()
if not admin_token:
logger.error("Can't obtain admin token.")
messages.error(request, "Can't obtain admin token for logout.")
return redirect(settings.KEYCLOAK_CONFIG['LOGOUT_REDIRECT_URL'])
sessions = get_user_sessions(admin_token, user_id)
if sessions is None or not sessions:
logger.error("Can't find user sessions.")
messages.error(request, "No active sessions found for user.")
return render(request,'main/logout.html')
all_sessions_logged_out = True
for session in sessions:
if not logout_user(admin_token, session['id']):
all_sessions_logged_out = False
logger.error(f"Failed to log out session {session['id']}")
if all_sessions_logged_out:
logger.info("User successfully logged out from all Keycloak sessions.")
messages.success(request, "Successfully logged out.")
else:
messages.warning(request, "Partial logout. Some sessions may still be active.")
request.session.flush() # Clear Django session
return render(request,'main/logout.html')