Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] DISET version of the TokenManager service #7793

Open
wants to merge 3 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
"""
import time

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities import ThreadSafe
from DIRAC.Core.Utilities.DictCache import DictCache
from DIRAC.Core.Base.Client import Client, createClient
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Resources.IdProvider.IdProviderFactory import IdProviderFactory
from DIRAC.FrameworkSystem.private.authorization.utils.Tokens import OAuth2Token
from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import (
Expand Down
20 changes: 17 additions & 3 deletions src/DIRAC/FrameworkSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,22 @@ Services
storeHostInfo = Operator
}
}
##BEGIN TokenManager:
# Section to describe TokenManager service
TokenManager
{
Port = 9176
# Description of rules for access to methods
Authorization
{
# Settings by default:
Default = authenticated
getUsersTokensInfo = ProxyManagement
}
}
##END
##BEGIN TornadoTokenManager:
# Section to describe TokenManager system
# Section to describe TokenManager service
TornadoTokenManager
{
Protocol = https
Expand All @@ -40,7 +54,7 @@ Services
}
##END
##BEGIN ProxyManager:
# Section to describe ProxyManager system
# Section to describe ProxyManager service
# https://dirac.readthedocs.org/en/latest/AdministratorGuide/Systems/Framework/ProxyManager/index.html
ProxyManager
{
Expand All @@ -63,7 +77,7 @@ Services
}
##END
##BEGIN TornadoProxyManager:
# Section to describe ProxyManager system
# Section to describe ProxyManager service
# https://dirac.readthedocs.org/en/latest/AdministratorGuide/Systems/Framework/ProxyManager/index.html
TornadoProxyManager
{
Expand Down
285 changes: 285 additions & 0 deletions src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
"""TokenManager service is responsible for token management, namely storing, updating,
requesting new tokens for DIRAC components that have the appropriate permissions.

.. literalinclude:: ../ConfigTemplate.cfg
:start-after: ##BEGIN TokenManager:
:end-before: ##END
:dedent: 2
:caption: TokenManager options

The most common use of this service is to obtain tokens with certain scope to return to the user for its purposes,
or to provide to the DIRAC service to perform asynchronous tasks on behalf of the user.
This is mainly about the :py:meth:`export_getToken` method.

.. image:: /_static/Systems/FS/TokenManager_getToken.png
:alt: https://dirac.readthedocs.io/en/integration/_images/TokenManager_getToken.png (source https://github.com/TaykYoku/DIRACIMGS/raw/main/TokenManagerService_getToken.ai)

The client has a mechanism for caching the received tokens.
This helps reduce the number of requests to both the service and the Identity Provider (IdP).

If the client has a valid **access token** in the cache, it is used until it expires.
After that you need to update. The client can update it independently if on the server where it is in ``dirac.cfg``
``client_id`` and ``client_secret`` of the Identity Provider client are registered.

Otherwise, the client makes an RPC call to the **TornadoManager** service.
The ``refresh token`` from :py:class:`TokenDB <DIRAC.FrameworkSystem.DB.TokenDB.TokenDB>`
is taken and the **exchange token** request to Identity Provider is made.
"""

import pprint

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Security import Properties
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.FrameworkSystem.DB.TokenDB import TokenDB
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Resources.IdProvider.IdProviderFactory import IdProviderFactory
from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import (
getIdProviderClient,
getCachedKey,
)


class TokenManagerHandlerMixin:
DEFAULT_AUTHORIZATION = ["authenticated"]

@classmethod
def initializeHandler(cls, *args):
"""Initialization

:return: S_OK()/S_ERROR()
"""

# The service plays an important OAuth 2.0 role, namely it is an Identity Provider client.
# This allows you to manage tokens without the involvement of their owners.
cls.idps = IdProviderFactory()

# Let's try to connect to the database
try:
cls.__tokenDB = TokenDB(parentLogger=cls.log)
except Exception as e:
cls.log.exception(e)
return S_ERROR(f"Could not connect to the database {repr(e)}")

return S_OK()

auth_getUserTokensInfo = ["authenticated"]
types_getUserTokensInfo = []

def export_getUserTokensInfo(self):
"""Generate information dict about user tokens

:return: dict
"""
tokensInfo = []
credDict = self.getRemoteCredentials()
result = Registry.getDNForUsername(credDict["username"])
if not result["OK"]:
return result
for dn in result["Value"]:
result = Registry.getIDFromDN(dn)
if result["OK"]:
result = self.__tokenDB.getTokensByUserID(result["Value"])
if not result["OK"]:
return result
tokensInfo += result["Value"]
return S_OK(tokensInfo)

auth_getUsersTokensInfo = [Properties.PROXY_MANAGEMENT]
types_getUserTokensInfo = [list]

def export_getUsersTokensInfo(self, users: list):
"""Get the info about the user tokens in the database

:param users: user names

:return: S_OK(list) -- return list of tokens dictionaries
"""
tokensInfo = []
for user in users:
# Find the user ID among his DNs
result = Registry.getDNForUsername(user)
if not result["OK"]:
return result
for dn in result["Value"]:
uid = Registry.getIDFromDN(dn).get("Value")
if uid:
result = self.__tokenDB.getTokensByUserID(uid)
if not result["OK"]:
self.log.error(result["Message"])
else:
for tokenDict in result["Value"]:
if tokenDict not in tokensInfo:
# The database does not contain a username,
# as it is a unique user ID exclusively for DIRAC
# and is not associated with a token.
tokenDict["username"] = user
tokensInfo.append(tokenDict)
return S_OK(tokensInfo)

types_updateToken = [dict, str, str, int]

def export_updateToken(self, token: dict, userID: str, provider: str, rt_expired_in: int = 24 * 3600):
"""Using this method, you can transfer user tokens for storage in the TokenManager.

It is important to note that TokenManager saves only one token per user and, accordingly,
the Identity Provider from which it was issued. So when a new token is delegated,
keep in mind that the old token will be deleted.

:param token: token
:param userID: user ID
:param provider: provider name
:param rt_expired_in: refresh token expires time (in seconds)

:return: S_OK(list)/S_ERROR() -- list contain uploaded tokens info as dictionaries
"""
self.log.verbose(f"Update {userID} user token issued by {provider}:\n", pprint.pformat(token))
# prepare the client instance of the appropriate IdP to revoke the old tokens
result = self.idps.getIdProvider(provider)
if not result["OK"]:
return result
idPObj = result["Value"]
# overwrite old tokens with new ones
result = self.__tokenDB.updateToken(token, userID, provider, rt_expired_in)
if not result["OK"]:
return result
# revoke the old tokens
for oldToken in result["Value"]:
if "refresh_token" in oldToken and oldToken["refresh_token"] != token["refresh_token"]:
self.log.verbose("Revoke old refresh token:\n", pprint.pformat(oldToken))
idPObj.revokeToken(oldToken["refresh_token"])
# Let's return to the current situation with the storage of user tokens
return self.__tokenDB.getTokensByUserID(userID)

def __checkProperties(self, requestedUserDN: str, requestedUserGroup: str):
"""Check the properties and return if they can only download limited tokens if authorized

:param requestedUserDN: user DN
:param requestedUserGroup: DIRAC group

:return: S_OK(bool)/S_ERROR()
"""
credDict = self.getRemoteCredentials()
if Properties.FULL_DELEGATION in credDict["properties"]:
return S_OK(False)
if Properties.LIMITED_DELEGATION in credDict["properties"]:
return S_OK(True)
if Properties.PRIVATE_LIMITED_DELEGATION in credDict["properties"]:
if credDict["DN"] != requestedUserDN:
return S_ERROR("You are not allowed to download any token")
if Properties.PRIVATE_LIMITED_DELEGATION not in Registry.getPropertiesForGroup(requestedUserGroup):
return S_ERROR("You can't download tokens for that group")
return S_OK(True)
# Not authorized!
return S_ERROR("You can't get tokens!")

types_getToken = [None, None, None, None, None]

def export_getToken(
self,
username: str = None,
userGroup: str = None,
scope: list[str] = None,
audience: str = None,
identityProvider: str = None,
requiredTimeLeft: int = 0,
):
"""Get an access token for a user/group.

* Properties:
* FullDelegation <- permits full delegation of tokens
* LimitedDelegation <- permits downloading only limited tokens
* PrivateLimitedDelegation <- permits downloading only limited tokens for one self

:param username: user name
:param userGroup: user group
:param scope: requested scope
:param audience: requested audience
:param identityProvider: Identity Provider name
:param requiredTimeLeft: requested minimum life time

:return: S_OK(dict)/S_ERROR()
"""
# Get an IdProvider Client instance
result = getIdProviderClient(userGroup, identityProvider)
if not result["OK"]:
return result
idpObj = result["Value"]

# getCachedKey is just used here to resolve the default scopes
_, scope, *_ = getCachedKey(idpObj, username, userGroup, scope, audience)

# A client token is requested
if not username:
result = self.__checkProperties("", "")
if not result["OK"]:
return result

# Get the client token with requested scope and audience
result = idpObj.fetchToken(grant_type="client_credentials", scope=scope, audience=audience)
if result["OK"]:
result["Value"] = dict(result["Value"])

return result

# A user token is requested
err = []
# No luck so far, let's refresh the token stored in the database
result = Registry.getDNForUsername(username)
if not result["OK"]:
return result
for dn in result["Value"]:
# For backward compatibility, the user ID is written as DN. So let's check if this DN contains a user ID
result = Registry.getIDFromDN(dn)
if result["OK"]:
uid = result["Value"]
# To do this, first find the refresh token stored in the database with the maximum scope
result = self.__tokenDB.getTokenForUserProvider(uid, idpObj.name)
if result["OK"] and result["Value"]:
tokens = result["Value"]
result = self.__checkProperties(dn, userGroup)
if result["OK"]:
# refresh token with requested scope
result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope)
if result["OK"]:
return result
Comment on lines +233 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result = Registry.getIDFromDN(dn)
if result["OK"]:
uid = result["Value"]
# To do this, first find the refresh token stored in the database with the maximum scope
result = self.__tokenDB.getTokenForUserProvider(uid, idpObj.name)
if result["OK"] and result["Value"]:
tokens = result["Value"]
result = self.__checkProperties(dn, userGroup)
if result["OK"]:
# refresh token with requested scope
result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope)
if result["OK"]:
return result
result = Registry.getIDFromDN(dn)
if not result["OK"]:
continue
uid = result["Value"]
# To do this, first find the refresh token stored in the database with the maximum scope
result = self.__tokenDB.getTokenForUserProvider(uid, idpObj.name)
if not result["OK"] or not result["Value"]:
continue
tokens = result["Value"]
result = self.__checkProperties(dn, userGroup)
if not result["OK"]:
continue
# refresh token with requested scope
result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope)
if result["OK"]:
return result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: Shouldn't we first result = self.__checkProperties(dn, userGroup) instead of getting getTokenForUserProvider ? Seems we can do that checkProperties before?

# Did not find any token associated with the found user ID
err.append(result.get("Message", f"No token found for {uid}"))
# Collect all errors when trying to get a token, or if no user ID is registered
return S_ERROR("; ".join(err or [f"No user ID found for {username}"]))

types_deleteToken = [str]

def export_deleteToken(self, userDN: str):
"""Delete a token from the DB

:param userDN: user DN

:return: S_OK()/S_ERROR()
"""

# temporary ugly stuff to make it compliant with proxy management
userDN = f"/O=DIRAC/CN={userDN}"

# Delete it from cache
credDict = self.getRemoteCredentials()
if Properties.PROXY_MANAGEMENT not in credDict["properties"]:
if userDN != credDict["DN"]:
return S_ERROR("You aren't allowed!")
result = Registry.getIDFromDN(userDN)
return self.__tokenDB.removeToken(user_id=result["Value"]) if result["OK"] else result

types_getTokensByUserID = [str]

def export_getTokensByUserID(self, userID: str):
"""Retrieve a token from the DB

:param userID: user's token id

:return: S_OK(list)/S_ERROR() token row in dict format
"""
return self.__tokenDB.getTokensByUserID(userID)


class TokenManagerHandler(TokenManagerHandlerMixin, RequestHandler):
pass
Loading