Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support SDM Subscription and Unsubscription for UE Session #123

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type SMFContext struct {

// Each pdu session should have a unique charging id
ChargingIDGenerator *idgenerator.IDGenerator

Ues *Ues
}

func GenerateChargingID() int32 {
Expand Down Expand Up @@ -248,6 +250,8 @@ func InitSmfContext(config *factory.Config) {
smfContext.Locality = configuration.Locality

TeidGenerator = idgenerator.NewGenerator(1, math.MaxUint32)

smfContext.Ues = InitSmfUeData()
}

func InitSMFUERouting(routingConfig *factory.RoutingConfig) {
Expand Down
100 changes: 100 additions & 0 deletions internal/context/sm_ue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package context

import "sync"

type UeData struct {
PduSessionCount int // store number of PDU Sessions for each UE
SdmSubscriptionId string // store SDM Subscription ID per UE
}

type Ues struct {
ues map[string]UeData // map to store UE data with SUPI as key
mu sync.Mutex // mutex for concurrent access
}

func InitSmfUeData() *Ues {
return &Ues{
ues: make(map[string]UeData),
}
}

// IncrementPduSessionCount increments the PDU session count for a given UE.
func (u *Ues) IncrementPduSessionCount(ueId string) {
u.mu.Lock()
defer u.mu.Unlock()

ueData := u.ues[ueId]
ueData.PduSessionCount++
u.ues[ueId] = ueData
}

// DecrementPduSessionCount decrements the PDU session count for a given UE.
func (u *Ues) DecrementPduSessionCount(ueId string) {
u.mu.Lock()
defer u.mu.Unlock()

ueData := u.ues[ueId]
if ueData.PduSessionCount > 0 {
ueData.PduSessionCount--
u.ues[ueId] = ueData
}
}

// SetSubscriptionId sets the SDM subscription ID for a given UE.
func (u *Ues) SetSubscriptionId(ueId, subscriptionId string) {
u.mu.Lock()
defer u.mu.Unlock()

ueData := u.ues[ueId]
ueData.SdmSubscriptionId = subscriptionId
u.ues[ueId] = ueData
}

// GetSubscriptionId returns the SDM subscription ID for a given UE.
func (u *Ues) GetSubscriptionId(ueId string) string {
u.mu.Lock()
defer u.mu.Unlock()

return u.ues[ueId].SdmSubscriptionId
}

// GetUeData returns the data for a given UE.
func (u *Ues) GetUeData(ueId string) UeData {
u.mu.Lock()
defer u.mu.Unlock()

return u.ues[ueId]
}

// DeleteUe deletes a UE.
func (u *Ues) DeleteUe(ueId string) {
u.mu.Lock()
defer u.mu.Unlock()

delete(u.ues, ueId)
}

// UeExists checks if a UE already exists.
func (u *Ues) UeExists(ueId string) bool {
u.mu.Lock()
defer u.mu.Unlock()

_, exists := u.ues[ueId]
return exists
}

// IsLastPduSession checks if it is the last PDU session for a given UE.
func (u *Ues) IsLastPduSession(ueID string) bool {
u.mu.Lock()
defer u.mu.Unlock()

return u.ues[ueID].PduSessionCount == 1
}

// GetPduSessionCount returns the number of sessions for a given UE.
func (u *Ues) GetPduSessionCount(ueId string) int {
u.mu.Lock()
defer u.mu.Unlock()

return u.ues[ueId].PduSessionCount
}
110 changes: 108 additions & 2 deletions internal/sbi/consumer/udm_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@ func (s *nudmService) GetSmData(ctx context.Context, supi string,
var client *Nudm_SubscriberDataManagement.APIClient
for _, service := range *s.consumer.Context().UDMProfile.NfServices {
if service.ServiceName == models.ServiceName_NUDM_SDM {
SDMConf := Nudm_SubscriberDataManagement.NewConfiguration()
SDMConf.SetBasePath(service.ApiPrefix)
client = s.getSubscribeDataManagementClient(service.ApiPrefix)
if client != nil {
break
}
}
}

Expand All @@ -199,3 +200,108 @@ func (s *nudmService) GetSmData(ctx context.Context, supi string,

return sessSubData, rsp, err
}

func (s *nudmService) Subscribe(ctx context.Context, smCtx *smf_context.SMContext, smPlmnID *models.PlmnId) (
*models.ProblemDetails, error,
) {
var client *Nudm_SubscriberDataManagement.APIClient
for _, service := range *s.consumer.Context().UDMProfile.NfServices {
if service.ServiceName == models.ServiceName_NUDM_SDM {
client = s.getSubscribeDataManagementClient(service.ApiPrefix)
if client != nil {
break
}
}
}

if client == nil {
return nil, fmt.Errorf("sdm client failed")
}

sdmSubscription := models.SdmSubscription{
NfInstanceId: s.consumer.Context().NfInstanceID,
PlmnId: smPlmnID,
}

resSubscription, httpResp, localErr := client.SubscriptionCreationApi.Subscribe(
ctx, smCtx.Supi, sdmSubscription)
defer func() {
if httpResp != nil {
if rspCloseErr := httpResp.Body.Close(); rspCloseErr != nil {
logger.ConsumerLog.Errorf("Subscribe response body cannot close: %+v",
rspCloseErr)
}
}
}()

if localErr == nil {
s.consumer.Context().Ues.SetSubscriptionId(smCtx.Supi, resSubscription.SubscriptionId)
logger.PduSessLog.Infoln("SDM Subscription Successful UE:", smCtx.Supi, "SubscriptionId:",
resSubscription.SubscriptionId)
} else if httpResp != nil {
if httpResp.Status != localErr.Error() {
return nil, localErr
}
problem := localErr.(openapi.GenericOpenAPIError).Model().(models.ProblemDetails)
return &problem, nil
} else {
return nil, openapi.ReportError("server no response")
}

s.consumer.Context().Ues.IncrementPduSessionCount(smCtx.Supi)
return nil, nil
}

func (s *nudmService) UnSubscribe(smCtx *smf_context.SMContext) (
*models.ProblemDetails, error,
) {
ctx, _, oauthErr := s.consumer.Context().GetTokenCtx(models.ServiceName_NUDM_SDM, models.NfType_UDM)
if oauthErr != nil {
return nil, fmt.Errorf("Get Token Context Error[%v]", oauthErr)
}

if s.consumer.Context().Ues.IsLastPduSession(smCtx.Supi) {
var client *Nudm_SubscriberDataManagement.APIClient
for _, service := range *s.consumer.Context().UDMProfile.NfServices {
if service.ServiceName == models.ServiceName_NUDM_SDM {
client = s.getSubscribeDataManagementClient(service.ApiPrefix)
if client != nil {
break
}
}
}

if client == nil {
return nil, fmt.Errorf("sdm client failed")
}

subscriptionId := s.consumer.Context().Ues.GetSubscriptionId(smCtx.Supi)

httpResp, localErr := client.SubscriptionDeletionApi.Unsubscribe(ctx, smCtx.Supi, subscriptionId)
defer func() {
if httpResp != nil {
if rspCloseErr := httpResp.Body.Close(); rspCloseErr != nil {
logger.PduSessLog.Errorf("Unsubscribe response body cannot close: %+v",
rspCloseErr)
}
}
}()
if localErr == nil {
logger.PduSessLog.Infoln("SDM UnSubscription Successful UE:", smCtx.Supi, "SubscriptionId:",
subscriptionId)
} else if httpResp != nil {
if httpResp.Status != localErr.Error() {
return nil, localErr
}
problem := localErr.(openapi.GenericOpenAPIError).Model().(models.ProblemDetails)
return &problem, nil
} else {
return nil, openapi.ReportError("server no response")
}
s.consumer.Context().Ues.DeleteUe(smCtx.Supi)
} else {
s.consumer.Context().Ues.DecrementPduSessionCount(smCtx.Supi)
}

return nil, nil
}
38 changes: 38 additions & 0 deletions internal/sbi/processor/pdu_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ func (p *Processor) HandlePDUSessionSMContextCreate(
}
}

if !p.Context().Ues.UeExists(smContext.Supi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the error happens after the SDM data changed?
I think use the defer would be better in this case:

defer func(err error) {
  if err != nil {
	if !p.Context().Ues.UeExists(smContext.Supi) {
		if problemDetails, err := p.Consumer().
			Subscribe(ctx, smContext, smPlmnID); problemDetails != nil {
			smContext.Log.Errorln("SDM Subscription Failed Problem:", problemDetails)
		} else if err != nil {
			smContext.Log.Errorln("SDM Subscription Error:", err)
		}
	} else {
		p.Context().Ues.IncrementPduSessionCount(smContext.Supi)
	}
  }
}(err)

if problemDetails, err := p.Consumer().
Subscribe(ctx, smContext, smPlmnID); problemDetails != nil {
smContext.Log.Errorln("SDM Subscription Failed Problem:", problemDetails)
} else if err != nil {
smContext.Log.Errorln("SDM Subscription Error:", err)
}
} else {
p.Context().Ues.IncrementPduSessionCount(smContext.Supi)
}

establishmentRequest := m.PDUSessionEstablishmentRequest
if err := HandlePDUSessionEstablishmentRequest(smContext, establishmentRequest); err != nil {
smContext.Log.Errorf("PDU Session Establishment fail by %s", err)
Expand Down Expand Up @@ -325,6 +336,15 @@ func (p *Processor) HandlePDUSessionSMContextUpdate(
}
}

if smf_context.GetSelf().Ues.UeExists(smContext.Supi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.
Please do this action by using defer.

problemDetails, clientErr := p.Consumer().UnSubscribe(smContext)
if problemDetails != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Failed Problem[%+v]", problemDetails)
} else if clientErr != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Error[%+v]", err)
}
}

if smContext.UeCmRegistered {
problemDetails, errUeCmDeregistration := p.Consumer().UeCmDeregistration(smContext)
if problemDetails != nil {
Expand Down Expand Up @@ -878,6 +898,15 @@ func (p *Processor) HandlePDUSessionSMContextRelease(
}
}

if p.Context().Ues.UeExists(smContext.Supi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.
Please do this action by using defer.

problemDetails, err := p.Consumer().UnSubscribe(smContext)
if problemDetails != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Failed Problem[%+v]", problemDetails)
} else if err != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Error[%+v]", err)
}
}

if smContext.UeCmRegistered {
problemDetails, err := p.Consumer().UeCmDeregistration(smContext)
if problemDetails != nil {
Expand Down Expand Up @@ -969,6 +998,15 @@ func (p *Processor) HandlePDUSessionSMContextLocalRelease(
}
}

if p.Context().Ues.UeExists(smContext.Supi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.
Please do this action by using defer.

problemDetails, err := p.Consumer().UnSubscribe(smContext)
if problemDetails != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Failed Problem[%+v]", problemDetails)
} else if err != nil {
logger.PduSessLog.Errorf("SDM UnSubscription Error[%+v]", err)
}
}

if smContext.UeCmRegistered {
problemDetails, err := p.Consumer().UeCmDeregistration(smContext)
if problemDetails != nil {
Expand Down
9 changes: 9 additions & 0 deletions internal/sbi/processor/sm_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ func (p *Processor) RemoveSMContextFromAllNF(smContext *smf_context.SMContext, s
}
}

if smf_context.GetSelf().Ues.UeExists(smContext.Supi) {
problemDetails, err := p.Consumer().UnSubscribe(smContext)
if problemDetails != nil {
smContext.Log.Errorf("SDM UnSubscription Failed Problem[%+v]", problemDetails)
} else if err != nil {
smContext.Log.Errorf("SDM UnSubscription Error[%+v]", err)
}
}

// Because the amfUE who called this SMF API is being locked until the API Handler returns,
// sending SMContext Status Notification should run asynchronously
// so that this function returns immediately.
Expand Down
Loading