Skip to content

Commit

Permalink
EFGS downloaded keys upsert through insert manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ubhaller committed Mar 11, 2021
1 parent 3d30b3e commit 17a5906
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
public interface GaenDataService {

/**
* Upserts (Update or Inserts) the given key received from interops synchronization.
* Upserts (Update or Inserts) the given keys received from interops synchronization.
*
* @param key the exposed key to upsert
* @param keys the exposed keys to upsert
* @param now time of the sync
* @param origin the origin or the key
* @param batchTag batchTag of downloaded key
*/
void upsertExposeeFromInterops(GaenKey key, UTCInstant now, String origin);
void upsertExposeeFromInterops(
List<GaenKey> keys, UTCInstant now, String origin, String batchTag);

/**
* Upserts (Update or Inserts) the given list of exposed keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ public JdbcGaenDataServiceImpl(

@Override
@Transactional(readOnly = false)
public void upsertExposeeFromInterops(GaenKey gaenKey, UTCInstant now, String origin) {
internalUpsertKey(gaenKey, now, origin, true);
public void upsertExposeeFromInterops(
List<GaenKey> gaenKeys, UTCInstant now, String origin, String batchTag) {
for (GaenKey gaenKey : gaenKeys) {
internalUpsertKey(gaenKey, now, origin, batchTag, true);
}
}

@Override
Expand All @@ -84,7 +87,7 @@ public void upsertExposeesDelayed(
: delayedReceivedAt;

for (var gaenKey : gaenKeys) {
internalUpsertKey(gaenKey, receivedAt, this.originCountry, withFederationGateway);
internalUpsertKey(gaenKey, receivedAt, this.originCountry, null, withFederationGateway);
}
}

Expand Down Expand Up @@ -294,25 +297,30 @@ public void cleanDB(Duration retentionPeriod) {
}

private void internalUpsertKey(
GaenKey gaenKey, UTCInstant receivedAt, String origin, boolean withFederationGateway) {
GaenKey gaenKey,
UTCInstant receivedAt,
String origin,
String batchTag,
boolean withFederationGateway) {
String sqlKey = null;
if (dbType.equals(PGSQL)) {
sqlKey =
"insert into t_gaen_exposed (key, rolling_start_number, rolling_period, received_at,"
+ " origin, share_with_federation_gateway) values (:key, :rolling_start_number,"
+ " :rolling_period, :received_at, :origin, :share_with_federation_gateway) on"
+ " origin, share_with_federation_gateway, batch_tag)"
+ " values (:key, :rolling_start_number, :rolling_period, :received_at,"
+ " :origin, :share_with_federation_gateway, :batch_tag) on"
+ " conflict on constraint gaen_exposed_key do nothing";
} else {
sqlKey =
"merge into t_gaen_exposed using (values(cast(:key as varchar(24)),"
+ " :rolling_start_number, :rolling_period, :received_at, cast(:origin as"
+ " varchar(10)), :share_with_federation_gateway)) as vals(key,"
+ " rolling_start_number, rolling_period, received_at, origin,"
+ " share_with_federation_gateway) on t_gaen_exposed.key = vals.key when not matched"
+ " then insert (key, rolling_start_number, rolling_period, received_at, origin,"
+ " share_with_federation_gateway) values (vals.key, vals.rolling_start_number,"
+ " vals.rolling_period, vals.received_at, vals.origin,"
+ " vals.share_with_federation_gateway)";
+ " varchar(10)), :share_with_federation_gateway, cast(:batch_tag as varchar(50))))"
+ " as vals(key, rolling_start_number, rolling_period, received_at, origin,"
+ " share_with_federation_gateway, batch_tag) on t_gaen_exposed.key = vals.key"
+ " when not matched then insert (key, rolling_start_number, rolling_period,"
+ " received_at, origin, share_with_federation_gateway, batch_tag)"
+ " values (vals.key, vals.rolling_start_number, vals.rolling_period,"
+ " vals.received_at, vals.origin, vals.share_with_federation_gateway, vals.batch_tag)";
}

MapSqlParameterSource params = new MapSqlParameterSource();
Expand All @@ -322,6 +330,7 @@ private void internalUpsertKey(
params.addValue("received_at", receivedAt.getDate());
params.addValue("origin", origin);
params.addValue("share_with_federation_gateway", withFederationGateway);
params.addValue("batch_tag", batchTag);
KeyHolder keyHolder = new GeneratedKeyHolder();
jt.update(sqlKey, params, keyHolder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
import org.dpppt.backend.sdk.data.gaen.JdbcGaenDataServiceImpl;
import org.dpppt.backend.sdk.data.interops.JdbcSyncLogDataServiceImpl;
import org.dpppt.backend.sdk.data.interops.SyncLogDataService;
import org.dpppt.backend.sdk.interops.insertmanager.InteropsInsertManager;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.AssertKeyFormat;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.DsosFilter;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.EnforceRetentionPeriod;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.EnforceValidRollingPeriod;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.RemoveKeysFromFuture;
import org.dpppt.backend.sdk.interops.model.HubConfigs;
import org.dpppt.backend.sdk.interops.syncer.EfgsHubSyncer;
import org.dpppt.backend.sdk.interops.syncer.efgs.EfgsClient;
Expand All @@ -37,6 +43,9 @@ public abstract class WSBaseConfig implements WebMvcConfigurer {
@Value("${ws.exposedlist.releaseBucketDuration: 7200000}")
long releaseBucketDuration;

@Value("${ws.app.gaen.key_size: 16}")
int gaenKeySizeBytes;

@Value("${ws.app.gaen.timeskew:PT2h}")
Duration timeSkew;

Expand Down Expand Up @@ -71,8 +80,24 @@ public EfgsClient efgsClient(HubConfigs hubConfigs) throws CertificateException
public EfgsHubSyncer efgsHubSyncer(
EfgsClient efgsClient,
GaenDataService gaenDataService,
SyncLogDataService syncLogDataService) {
SyncLogDataService syncLogDataService,
InteropsInsertManager interopsInsertManager) {
return new EfgsHubSyncer(
efgsClient, Duration.ofDays(retentionDays), gaenDataService, syncLogDataService);
efgsClient,
Duration.ofDays(retentionDays),
gaenDataService,
syncLogDataService,
interopsInsertManager);
}

@Bean
public InteropsInsertManager interopsInsertManager(GaenDataService gaenDataService) {
var manager = new InteropsInsertManager(gaenDataService);
manager.addFilter(new AssertKeyFormat(gaenKeySizeBytes));
manager.addFilter(new RemoveKeysFromFuture());
manager.addFilter(new EnforceRetentionPeriod(Duration.ofDays(retentionDays)));
manager.addFilter(new EnforceValidRollingPeriod());
manager.addFilter(new DsosFilter());
return manager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.dpppt.backend.sdk.interops.insertmanager;

import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.dpppt.backend.sdk.data.gaen.GaenDataService;
import org.dpppt.backend.sdk.interops.insertmanager.insertionfilters.InteropsKeyInsertionFilter;
import org.dpppt.backend.sdk.interops.insertmanager.insertionmodifier.InteropsKeyInsertionModifier;
import org.dpppt.backend.sdk.model.gaen.GaenKeyForInterops;
import org.dpppt.backend.sdk.utils.UTCInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The insertion manager is responsible for inserting keys downloaded from an international
* gateway/hub into the database. To make sure we only have valid keys in the database, a list of
* {@link InteropsKeyInsertionModifier} is applied, and then a list of {@link
* InteropsKeyInsertionFilter} is applied to the given list of keys. The remaining keys are then
* inserted into the database.
*/
public class InteropsInsertManager {

private final List<InteropsKeyInsertionFilter> filterList = new ArrayList<>();
private final List<InteropsKeyInsertionModifier> modifierList = new ArrayList<>();

private final GaenDataService dataService;

private static final Logger logger = LoggerFactory.getLogger(InteropsInsertManager.class);

public InteropsInsertManager(GaenDataService dataService) {
this.dataService = dataService;
}

public void addFilter(InteropsKeyInsertionFilter filter) {
this.filterList.add(filter);
}

public void addModifier(InteropsKeyInsertionModifier modifier) {
this.modifierList.add(modifier);
}

/**
* Inserts the keys into the database. The additional parameters are supplied to the configured
* modifiers and filters.
*
* @param keys the list of downloaded international keys
* @param now current timestamp to work with.
* @param batchTag
*/
public void insertIntoDatabase(List<GaenKeyForInterops> keys, UTCInstant now, String batchTag) {
if (keys == null || keys.isEmpty()) {
return;
}
var internalKeys = modifyAndFilter(keys, now);
if (!internalKeys.isEmpty()) {
for (Entry<String, List<GaenKeyForInterops>> keysForOrigin :
internalKeys.stream()
.collect(Collectors.groupingBy(GaenKeyForInterops::getOrigin))
.entrySet()) {
dataService.upsertExposeeFromInterops(
keysForOrigin.getValue().stream()
.map(GaenKeyForInterops::getGaenKey)
.collect(Collectors.toList()),
now,
keysForOrigin.getKey(),
batchTag);
}
}
}

private List<GaenKeyForInterops> modifyAndFilter(List<GaenKeyForInterops> keys, UTCInstant now) {
var internalKeys = keys;

for (InteropsKeyInsertionModifier modifier : modifierList) {
internalKeys = modifier.modify(now, internalKeys);
}

for (InteropsKeyInsertionFilter filter : filterList) {
int sizeBefore = internalKeys.size();
internalKeys = filter.filter(now, internalKeys);
logger.info(
"{} keys filtered out by {} filter",
(internalKeys.size() - sizeBefore),
filter.getName());
}
return internalKeys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.dpppt.backend.sdk.interops.insertmanager.insertionfilters;

import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;
import org.dpppt.backend.sdk.model.gaen.GaenKeyForInterops;
import org.dpppt.backend.sdk.utils.UTCInstant;

/** Filters out keys with invalid base64 encoding or incorrect length. */
public class AssertKeyFormat implements InteropsKeyInsertionFilter {

private final int gaenKeySizeBytes;

public AssertKeyFormat(int gaenKeySizeBytes) {
this.gaenKeySizeBytes = gaenKeySizeBytes;
}

@Override
public List<GaenKeyForInterops> filter(UTCInstant now, List<GaenKeyForInterops> content) {
return content.stream()
.filter(key -> isValidKeyFormat(key.getKeyData()))
.collect(Collectors.toList());
}

private boolean isValidKeyFormat(String value) {
try {
byte[] key = Base64.getDecoder().decode(value);
return key.length == gaenKeySizeBytes;
} catch (Exception e) {
return false;
}
}

@Override
public String getName() {
return "AssertKeyFormat";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.dpppt.backend.sdk.interops.insertmanager.insertionfilters;

import java.util.List;
import java.util.stream.Collectors;
import org.dpppt.backend.sdk.model.gaen.GaenKeyForInterops;
import org.dpppt.backend.sdk.utils.UTCInstant;

/**
* Filter keys that have a day since onset of symptoms (dsos) not relevant for our epidemiological
* parameters
*/
public class DsosFilter implements InteropsKeyInsertionFilter {

@Override
public List<GaenKeyForInterops> filter(UTCInstant now, List<GaenKeyForInterops> content) {
return content.stream().filter(key -> !hasIrrelevantDsos(key)).collect(Collectors.toList());
}

private boolean hasIrrelevantDsos(GaenKeyForInterops key) {
Integer dsos = key.getDaysSinceOnsetOfSymptoms();
if (dsos == null) { // keys with dsos null values are dropped
return true;
}
int normalizedDsos = dsos;
if (dsos < 20) { // onset known. dsos in [-14, +14]
// dsos is already normalized
} else if (dsos < 1986) { // onset range `n` days (n < 19). dsos in [n*100-14, n*100+14].
final int nMax = 19;
final int n = dsos + nMax / 100;
int endOfRange = n * 100;
int startOfRange = endOfRange - n;
normalizedDsos = dsos - startOfRange;
} else if (dsos < 2986) { // unknown onset. dsos in [1986, 2014]
normalizedDsos = dsos - 2000;
} else if (dsos < 3986) { // asymptomatic. dsos in [2986, 3014]
normalizedDsos = dsos - 3000;
} else { // unknown symptom status. dsos in [3986, 4014]
normalizedDsos = dsos - 4000;
}
return normalizedDsos < -2;
}

@Override
public String getName() {
return "DSOS";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.dpppt.backend.sdk.interops.insertmanager.insertionfilters;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.dpppt.backend.sdk.model.gaen.GaenKeyForInterops;
import org.dpppt.backend.sdk.model.gaen.GaenUnit;
import org.dpppt.backend.sdk.utils.UTCInstant;

/**
* Checks if a key is in the configured retention period. If a key is before the retention period it
* is filtered out, as it will not be relevant for the system anymore.
*/
public class EnforceRetentionPeriod implements InteropsKeyInsertionFilter {

private final Duration retentionPeriod;

public EnforceRetentionPeriod(Duration retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}

@Override
public List<GaenKeyForInterops> filter(UTCInstant now, List<GaenKeyForInterops> content) {
return content.stream()
.filter(
key -> {
var timestamp = UTCInstant.of(key.getRollingStartNumber(), GaenUnit.TenMinutes);
return !timestamp.isBeforeDateOf(now.minus(retentionPeriod));
})
.collect(Collectors.toList());
}

@Override
public String getName() {
return "EnforeRetentionPeriod";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.dpppt.backend.sdk.interops.insertmanager.insertionfilters;

import java.util.List;
import java.util.stream.Collectors;
import org.dpppt.backend.sdk.model.gaen.GaenKeyForInterops;
import org.dpppt.backend.sdk.utils.UTCInstant;

/**
* This filter checks for valid rolling period. The rolling period must always be in [1..144],
* otherwise the key is not valid and is filtered out. See <a href=
* "https://github.com/google/exposure-notifications-server/blob/main/docs/server_functional_requirements.md#publishing-temporary-exposure-keys"
* >EN documentation</a>
*/
public class EnforceValidRollingPeriod implements InteropsKeyInsertionFilter {

@Override
public List<GaenKeyForInterops> filter(UTCInstant now, List<GaenKeyForInterops> content) {
return content.stream()
.filter(key -> key.getRollingPeriod() >= 1 && key.getRollingPeriod() <= 144)
.collect(Collectors.toList());
}

@Override
public String getName() {
return "EnforeValidRollingPeriod";
}
}
Loading

0 comments on commit 17a5906

Please sign in to comment.