Skip to content

Commit

Permalink
DB model extended for visited countries. DB model adapted for new key…
Browse files Browse the repository at this point in the history
… format. Insertion and retrieval for v1/v2 non international working with visited countries.
  • Loading branch information
martinalig committed Oct 15, 2020
1 parent 0f0c6fa commit c5ee830
Show file tree
Hide file tree
Showing 18 changed files with 399 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2020 Ubique Innovation AG <https://www.ubique.ch>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* SPDX-License-Identifier: MPL-2.0
*/

package db.migration.hsqldb;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V2_0_1__SetVisitedForExistingKeys extends BaseJavaMigration {

private static final Logger logger =
LoggerFactory.getLogger(V2_0_1__SetVisitedForExistingKeys.class);

@Override
public void migrate(Context context) throws Exception {

// HSQLDB: For Testing purposes only, set origin to CH
String originCountry = "CH";

// Get all key ids
List<Integer> keyIds = new ArrayList<>();
try (Statement select = context.getConnection().createStatement()) {
try (ResultSet rows = select.executeQuery("select pk_exposed_id from t_gaen_exposed")) {
while (rows.next()) {
int id = rows.getInt(1);
keyIds.add(id);
}
}
}
logger.info("Found " + keyIds.size() + " keys for migration");

// For each key, insert origin country as visited country
if (!keyIds.isEmpty()) {
try (PreparedStatement insertVisitedCountries =
context
.getConnection()
.prepareStatement("insert into t_visited(pfk_exposed_id, country) values(?, ?)")) {
for (Integer keyId : keyIds) {
insertVisitedCountries.setInt(1, keyId);
insertVisitedCountries.setString(2, originCountry);
insertVisitedCountries.addBatch();
}
insertVisitedCountries.executeBatch();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2020 Ubique Innovation AG <https://www.ubique.ch>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* SPDX-License-Identifier: MPL-2.0
*/

package db.migration.pgsql;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V2_0_1__SetVisitedForExistingKeys extends BaseJavaMigration {

private static final String ORIGIN_COUNTRY_SYS_VAR = "ws.origin.country";

private static final Logger logger =
LoggerFactory.getLogger(V2_0_1__SetVisitedForExistingKeys.class);

@Override
public void migrate(Context context) throws Exception {

String originCountry = System.getProperty(ORIGIN_COUNTRY_SYS_VAR);
if (originCountry == null || originCountry.isBlank()) {
throw new IllegalArgumentException(
"For successfull migration to the DP3T V2 database schema the country of origin must be"
+ " specified as system variable: "
+ ORIGIN_COUNTRY_SYS_VAR
+ " (for example: java -jar dp3t-sdk.jar -Dws.origin.country=CH ... )");
}

// Get all key ids
List<Integer> keyIds = new ArrayList<>();
try (Statement select = context.getConnection().createStatement()) {
try (ResultSet rows = select.executeQuery("select pk_exposed_id from t_gaen_exposed")) {
while (rows.next()) {
int id = rows.getInt(1);
keyIds.add(id);
}
}
}
logger.info("Found " + keyIds.size() + " keys for migration");

// For each key, insert origin country as visited country
try (PreparedStatement insertVisitedCountries =
context
.getConnection()
.prepareStatement("insert into t_visited(pfk_exposed_id, country) values(?, ?)")) {
for (Integer keyId : keyIds) {
insertVisitedCountries.setInt(1, keyId);
insertVisitedCountries.setString(2, originCountry);
insertVisitedCountries.addBatch();
}
insertVisitedCountries.executeBatch();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2020 Ubique Innovation AG <https://www.ubique.ch>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* SPDX-License-Identifier: MPL-2.0
*/

package db.migration.pgsql_cluster;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V2_0_1__SetVisitedForExistingKeys extends BaseJavaMigration {

private static final String ORIGIN_COUNTRY_SYS_VAR = "ws.origin.country";

private static final Logger logger =
LoggerFactory.getLogger(V2_0_1__SetVisitedForExistingKeys.class);

@Override
public void migrate(Context context) throws Exception {

String originCountry = System.getProperty(ORIGIN_COUNTRY_SYS_VAR);
if (originCountry == null || originCountry.isBlank()) {
throw new IllegalArgumentException(
"For successfull migration to the DP3T V2 database schema the country of origin must be"
+ " specified as system variable: "
+ ORIGIN_COUNTRY_SYS_VAR
+ " (for example: java -jar dp3t-sdk.jar -Dws.origin.country=CH ... )");
}

// Get all key ids
List<Integer> keyIds = new ArrayList<>();
try (Statement select = context.getConnection().createStatement()) {
try (ResultSet rows = select.executeQuery("select pk_exposed_id from t_gaen_exposed")) {
while (rows.next()) {
int id = rows.getInt(1);
keyIds.add(id);
}
}
}
logger.info("Found " + keyIds.size() + " keys for migration");

// For each key, insert origin country as visited country
try (PreparedStatement insertVisitedCountries =
context
.getConnection()
.prepareStatement("insert into t_visited(pfk_exposed_id, country) values(?, ?)")) {
for (Integer keyId : keyIds) {
insertVisitedCountries.setInt(1, keyId);
insertVisitedCountries.setString(2, originCountry);
insertVisitedCountries.addBatch();
}
insertVisitedCountries.executeBatch();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void updateFakeKeys() {
byte[] keyData = new byte[keySize];
random.nextBytes(keyData);
var keyGAENTime = (int) tmpDate.get10MinutesSince1970();
var key = new GaenKey(Base64.getEncoder().encodeToString(keyData), keyGAENTime, 144, 0);
var key = new GaenKey(Base64.getEncoder().encodeToString(keyData), keyGAENTime, 144);
keys.add(key);
}
// TODO: Check if currentKeyDate is indeed intended here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public GaenKey mapRow(ResultSet rs, int rowNum) throws SQLException {
gaenKey.setKeyData(rs.getString("key"));
gaenKey.setRollingStartNumber(rs.getInt("rolling_start_number"));
gaenKey.setRollingPeriod(rs.getInt("rolling_period"));
gaenKey.setTransmissionRiskLevel(rs.getInt("transmission_risk_level"));
return gaenKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.transaction.annotation.Transactional;

public class JDBCGAENDataServiceImpl implements GAENDataService {
Expand All @@ -35,12 +37,21 @@ public class JDBCGAENDataServiceImpl implements GAENDataService {
// https://developer.apple.com/documentation/exposurenotification/setting_up_a_key_server?language=objc)
private final Duration timeSkew;

// the origin country is used for the "default" visited country for all insertions that do not
// provide the visited countries for the key, so all v1 and non-international v2 inserted keys.
private final String originCountry;

public JDBCGAENDataServiceImpl(
String dbType, DataSource dataSource, Duration releaseBucketDuration, Duration timeSkew) {
String dbType,
DataSource dataSource,
Duration releaseBucketDuration,
Duration timeSkew,
String originCountry) {
this.dbType = dbType;
this.jt = new NamedParameterJdbcTemplate(dataSource);
this.releaseBucketDuration = releaseBucketDuration;
this.timeSkew = timeSkew;
this.originCountry = originCountry;
}

@Override
Expand All @@ -50,43 +61,71 @@ public void upsertExposees(List<GaenKey> gaenKeys, UTCInstant now) {
}

@Override
@Transactional(readOnly = false)
public void upsertExposeesDelayed(
List<GaenKey> gaenKeys, UTCInstant delayedReceivedAt, UTCInstant now) {

String sql = null;
String sqlKey = null;
String sqlVisited = null;
if (dbType.equals(PGSQL)) {
sql =
sqlKey =
"insert into t_gaen_exposed (key, rolling_start_number, rolling_period,"
+ " transmission_risk_level, received_at) values (:key, :rolling_start_number,"
+ " :rolling_period, :transmission_risk_level, :received_at) on conflict on"
+ " received_at) values (:key, :rolling_start_number,"
+ " :rolling_period, :received_at) on conflict on"
+ " constraint gaen_exposed_key do nothing";
sqlVisited =
"insert into t_visited (pfk_exposed_id, country) values (:keyId, :country) on conflict on"
+ " constraint PK_t_visited do nothing";
} else {
sql =
sqlKey =
"merge into t_gaen_exposed using (values(cast(:key as varchar(24)),"
+ " :rolling_start_number, :rolling_period, :transmission_risk_level, :received_at))"
+ " as vals(key, rolling_start_number, rolling_period, transmission_risk_level,"
+ " :rolling_start_number, :rolling_period, :received_at))"
+ " as vals(key, rolling_start_number, rolling_period,"
+ " received_at) on t_gaen_exposed.key = vals.key when not matched then insert (key,"
+ " rolling_start_number, rolling_period, transmission_risk_level, received_at)"
+ " rolling_start_number, rolling_period, received_at)"
+ " values (vals.key, vals.rolling_start_number, vals.rolling_period,"
+ " transmission_risk_level, vals.received_at)";
+ " vals.received_at)";
sqlVisited =
"merge into t_visited using (values(:keyId, :country)) as vals(keyId, country) on"
+ " t_visited.pfk_exposed_id = vals.keyId and t_visited.country = vals.country when"
+ " not matched then insert (pfk_exposed_id, country) values (vals.keyId,"
+ " vals.country)";
}
var parameterList = new ArrayList<MapSqlParameterSource>();

// Calculate the `receivedAt` just at the end of the current releaseBucket.
var receivedAt =
delayedReceivedAt == null
? now.roundToNextBucket(releaseBucketDuration).minus(Duration.ofMillis(1))
: delayedReceivedAt;

List<MapSqlParameterSource> visitedBatch = new ArrayList<>();

for (var gaenKey : gaenKeys) {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("key", gaenKey.getKeyData());
params.addValue("rolling_start_number", gaenKey.getRollingStartNumber());
params.addValue("rolling_period", gaenKey.getRollingPeriod());
params.addValue("transmission_risk_level", gaenKey.getTransmissionRiskLevel());
params.addValue("received_at", receivedAt.getDate());
KeyHolder keyHolder = new GeneratedKeyHolder();
jt.update(sqlKey, params, keyHolder);

// if the key already exists, no ids are returned. in this case we assume that we do not need
// to modify the visited countries also
if (keyHolder.getKeys() != null && !keyHolder.getKeys().isEmpty()) {
Object keyObject = keyHolder.getKeys().get("pk_exposed_id");
if (keyObject != null) {
int gaenKeyId = ((Integer) keyObject).intValue();
MapSqlParameterSource visitedParams = new MapSqlParameterSource();
visitedParams.addValue("keyId", gaenKeyId);
visitedParams.addValue("country", originCountry);
visitedBatch.add(visitedParams);
}
}
}

parameterList.add(params);
if (!visitedBatch.isEmpty()) {
jt.batchUpdate(
sqlVisited, visitedBatch.toArray(new MapSqlParameterSource[visitedBatch.size()]));
}
jt.batchUpdate(sql, parameterList.toArray(new MapSqlParameterSource[0]));
}

@Override
Expand All @@ -99,7 +138,7 @@ public List<GaenKey> getSortedExposedForKeyDate(
params.addValue("publishedUntil", publishedUntil.getDate());

String sql =
"select pk_exposed_id, key, rolling_start_number, rolling_period, transmission_risk_level"
"select pk_exposed_id, key, rolling_start_number, rolling_period"
+ " from t_gaen_exposed where rolling_start_number >= :rollingPeriodStartNumberStart"
+ " and rolling_start_number < :rollingPeriodStartNumberEnd and received_at <"
+ " :publishedUntil";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Created by Ubique Innovation AG
* https://www.ubique.ch
* Copyright (c) 2020. All rights reserved.
*/

alter table t_gaen_exposed drop column transmission_risk_level;

alter table t_gaen_exposed add column report_type varchar(30);
alter table t_gaen_exposed add column days_since_onset_of_symptoms integer;

CREATE TABLE t_visited
(
pfk_exposed_id integer NOT NULL,
country varchar(10) NOT NULL,
CONSTRAINT PK_t_visited PRIMARY KEY ( pfk_exposed_id, country ),
CONSTRAINT r_gaen_exposed_visited FOREIGN KEY ( pfk_exposed_id ) REFERENCES t_gaen_exposed ( pk_exposed_id ) ON DELETE CASCADE
);

CREATE INDEX idx_visited_exposed_id ON t_visited
(
pfk_exposed_id
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Created by Ubique Innovation AG
* https://www.ubique.ch
* Copyright (c) 2020. All rights reserved.
*/

alter table t_gaen_exposed drop column transmission_risk_level;

alter table t_gaen_exposed add column report_type varchar(30);
alter table t_gaen_exposed add column days_since_onset_of_symptoms integer;

CREATE TABLE t_visited
(
pfk_exposed_id integer NOT NULL,
country varchar(10) NOT NULL,
CONSTRAINT PK_t_visited PRIMARY KEY ( pfk_exposed_id, country ),
CONSTRAINT r_gaen_exposed_visited FOREIGN KEY ( pfk_exposed_id ) REFERENCES t_gaen_exposed ( pk_exposed_id ) ON DELETE CASCADE
);

CREATE INDEX idx_visited_exposed_id ON t_visited
(
pfk_exposed_id
);
Loading

0 comments on commit c5ee830

Please sign in to comment.