import requests from django.shortcuts import render,get_object_or_404 from django.http import HttpResponse, HttpResponseRedirect import logging from djangoProject1 import settings from .models import Category, Product from cart.forms import CartAddProductForm from jose import jwt from django.contrib import messages def product_detail(request, id, slug): product = get_object_or_404(Product, id=id, slug=slug, available=True) cart_product_form = CartAddProductForm() return render(request, 'main/product/detail.html', {'product': product, 'cart_product_form': cart_product_form}) # Функция для страницы "О нас" def about(request): return render(request, 'main/about.html') def exchange_code_for_token(authorization_code, redirect_uri): token_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/token' client_id = 'admin-cli' client_secret = 'wOVphEiLVBS1AlNKRpaQpD4yQh5Wm3TJ' payload = { 'grant_type': 'authorization_code', 'code': authorization_code, 'client_id': client_id, 'client_secret': client_secret, 'redirect_uri': redirect_uri, } response = requests.post(token_endpoint, data=payload) print(response.status_code) if response.status_code == 200: tokens = response.json() access_token = tokens.get('access_token') return access_token else: print("Failed to exchange code: HTTP status", response.status_code) print("Response body:", response.text) raise Exception('Failed to exchange authorization code for tokens') def get_user_info(access_token): userinfo_endpoint = 'https://auth.myterior.kz/realms/Harmony/protocol/openid-connect/userinfo' headers = {'Authorization': f'Bearer {access_token}'} response = requests.get(userinfo_endpoint, headers=headers) if response.status_code == 200: return response.json() else: print("Failed to fetch user info: HTTP status", response.status_code) return None import jwt def decode_access_token(access_token): payload = jwt.decode(access_token, options={"verify_signature": False}) return payload def callback(request,category_slug=None): #Получение кода авторизации из запроса context = {} authorization_code = request.GET.get('code') if authorization_code: try: access_token = exchange_code_for_token(authorization_code, settings.KEYCLOAK_CONFIG['CALLBACK_URL']) user_info = decode_access_token(access_token) request.session['user_info'] = user_info context['userinfo'] = request.session.get('user_info', {}) except Exception as e: context['error'] = str(e) else: context['error'] = "Authorization code not found." category = None categories = Category.objects.all() products = Product.objects.filter(available=True) if category_slug: category = get_object_or_404(Category, slug=category_slug) products = products.filter(category=category) return render(request, 'main/product/list.html',{'category': category, 'categories': categories, 'products': products}) from django.shortcuts import render, redirect def profile(request): if request.method == 'POST': user_info = request.session.get('user_info', {}) user_info['given_name'] = request.POST.get('first_name', user_info.get('given_name', '')) user_info['family_name'] = request.POST.get('last_name', user_info.get('family_name', '')) user_info['preferred_username'] = request.POST.get('username', user_info.get('preferred_username', '')) user_info['email'] = request.POST.get('email', user_info.get('email', '')) request.session['user_info'] = user_info return render(request, 'main/callback.html') else: context = {'userinfo': request.session.get('user_info', {})} return render(request, 'main/callback.html', context) # Настройте логирование logger = logging.getLogger(__name__) def get_keycloak_admin_token(): data = { 'client_id': settings.KEYCLOAK_CONFIG['CLIENT_ID'], 'client_secret': settings.KEYCLOAK_CONFIG['CLIENT_SECRET'], 'grant_type': 'client_credentials', } response = requests.post( f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/realms/{settings.KEYCLOAK_CONFIG['REALM']}/protocol/openid-connect/token", data=data ) return response.json()['access_token'] def get_user_sessions(admin_token, user_id): headers = {'Authorization': f'Bearer {admin_token}'} response = requests.get( f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/users/{user_id}/sessions", headers=headers ) if response.status_code == 200: return response.json() else: logger.error(f"Error fetching user sessions: {response.status_code}") return [] def logout_user(admin_token,session_id): headers = { 'Authorization': f'Bearer {admin_token}', } response = requests.delete( f"{settings.KEYCLOAK_CONFIG['SERVER_URL']}/admin/realms/{settings.KEYCLOAK_CONFIG['REALM']}/sessions/{session_id}", headers=headers ) return response.status_code == 204 def get_user_id_from_session(request): user_info = request.session.get('user_info', {}) user_id = user_info.get('sub') if isinstance(user_id, str): return user_id else: logger.error(f"Invalid user ID format in session: {user_id}") return None def admin_logout_user(request,category_slug = None): user_id = get_user_id_from_session(request) admin_token = get_keycloak_admin_token() if not admin_token: logger.error("Can't obtain admin token.") messages.error(request, "Can't obtain admin token for logout.") return redirect(settings.KEYCLOAK_CONFIG['LOGOUT_REDIRECT_URL']) sessions = get_user_sessions(admin_token, user_id) if sessions is None or not sessions: logger.error("Can't find user sessions.") messages.error(request, "No active sessions found for user.") redirect_uri = settings.KEYCLOAK_CONFIG['CALLBACK_URL'] return redirect(redirect_uri) all_sessions_logged_out = True for session in sessions: if not logout_user(admin_token, session['id']): all_sessions_logged_out = False logger.error(f"Failed to log out session {session['id']}") if all_sessions_logged_out: logger.info("User successfully logged out from all Keycloak sessions.") messages.success(request, "Successfully logged out.") else: messages.warning(request, "Partial logout. Some sessions may still be active.") request.session.flush() redirect_uri = settings.KEYCLOAK_CONFIG['CALLBACK_URL'] return redirect(redirect_uri) from django.http import JsonResponse def check_user_authenticated(request): user_id = get_user_id_from_session(request) is_authenticated = user_id is not None return JsonResponse({'isAuthenticated': is_authenticated})