Skip to content

Commit

Permalink
Merge pull request #34 from weaveworks/DEV-10831
Browse files Browse the repository at this point in the history
Add SaaS gateway sink to admission sink list
  • Loading branch information
nohasaayed authored May 24, 2022
2 parents 81ed647 + 1166b58 commit ab6d14b
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,24 @@ func main() {

if config.GatewaySinkURL != "" {
logger.Info("initializing SaaS gateway sink...")
gatewaySink, err := initSaaSSink(contextCli.Context, mgr, kubeClient, config)
gateway, err := initSaaSGateway(contextCli.Context, kubeClient, config)
if err != nil {
return err
}
if config.EnableAudit {
gatewaySink, err := initSaaSSink(contextCli.Context, mgr, kubeClient, config, gateway, packet.PacketPolicyValidationAudit)
if err != nil {
return err
}
auditSinks = append(auditSinks, gatewaySink)
}
if config.EnableAdmission {
logger.Warn("ignoring SaaS gateway sink for admission validation")
gatewaySink, err := initSaaSSink(contextCli.Context, mgr, kubeClient, config, gateway, packet.PacketPolicyValidationAdmission)
if err != nil {
return err
}
admissionSinks = append(admissionSinks, gatewaySink)
}
auditSinks = append(auditSinks, gatewaySink)
}

if config.EnableAudit {
Expand Down Expand Up @@ -463,7 +473,26 @@ func initK8sEventSink(mgr manager.Manager, config Config) (*k8s_event.K8sEventSi
return sink, nil
}

func initSaaSSink(ctx context.Context, mgr manager.Manager, kubeClient *kube.KubeClient, config Config) (*saas.SaaSGatewaySink, error) {
func initSaaSSink(ctx context.Context, mgr manager.Manager, kubeClient *kube.KubeClient, config Config, gateway *gateway.Gateway, packetKind packet.PacketKind) (*saas.SaaSGatewaySink, error) {
sink := saas.NewSaaSGatewaySink(
gateway,
packetKind,
SaaSSinkBatchSize,
SaaSSinkBatchExpiry,
)
logger.Info("starting SaaS gateway connection")
go gateway.Start(ctx)
active := gateway.WaitActive(ctx, 10*time.Second)
if !active {
return nil, errors.New("timeout while waiting for SaaS gateway connection")
}
logger.Info("starting Saas gateway sink ...")
mgr.Add(sink)

return sink, nil
}

func initSaaSGateway(ctx context.Context, kubeClient *kube.KubeClient, config Config) (*gateway.Gateway, error) {
secret, err := base64.StdEncoding.DecodeString(config.GatewaySinkSecret)
if err != nil {
return nil, errors.New("secret not encoded in base64 format")
Expand Down Expand Up @@ -519,20 +548,5 @@ func initSaaSSink(ctx context.Context, mgr manager.Manager, kubeClient *kube.Kub
agentPermissions,
build,
)
sink := saas.NewSaaSGatewaySink(
gateway,
packet.PacketPolicyValidationAudit,
SaaSSinkBatchSize,
SaaSSinkBatchExpiry,
)
logger.Info("starting SaaS gateway connection")
go gateway.Start(ctx)
active := gateway.WaitActive(ctx, 10*time.Second)
if !active {
return nil, errors.New("timeout while waiting for SaaS gateway connection")
}
logger.Info("starting Saas gateway sink ...")
mgr.Add(sink)

return sink, nil
return gateway, nil
}

0 comments on commit ab6d14b

Please sign in to comment.