Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of schema for pipeline #97

Merged
merged 23 commits into from
Sep 16, 2024
Merged

Refactor of schema for pipeline #97

merged 23 commits into from
Sep 16, 2024

Conversation

hellais
Copy link
Member

@hellais hellais commented Sep 10, 2024

@hellais
Copy link
Member Author

hellais commented Sep 10, 2024

I am going to run the change to alter the table to add the observation_idx column like so:

ALTER TABLE obs_web ADD COLUMN observation_idx UInt8
DEFAULT toInt8(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1

ALTER TABLE obs_web MODIFY ORDER BY (measurement_start_time,
hostname,
probe_cc,
probe_asn,
measurement_uid,
observation_idx)

The above will actually not do it, since the observation_id column was part of the primary key. I am going to instead go for a different approach which involves creating a new table, moving the data into the new table and then doing a rename of the old to the new.

Copy link

codecov bot commented Sep 10, 2024

Codecov Report

Attention: Patch coverage is 72.22222% with 45 lines in your changes missing coverage. Please review.

Project coverage is 82.24%. Comparing base (ff1fc86) to head (2a59271).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
oonipipeline/src/oonipipeline/db/maintenance.py 31.42% 24 Missing ⚠️
oonipipeline/src/oonipipeline/cli/commands.py 69.23% 16 Missing ⚠️
...ine/src/oonipipeline/temporal/activities/common.py 60.00% 2 Missing ⚠️
oonidata/src/oonidata/models/base.py 66.66% 1 Missing ⚠️
...e/src/oonipipeline/temporal/activities/analysis.py 0.00% 1 Missing ⚠️
...oonipipeline/transforms/measurement_transformer.py 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #97      +/-   ##
==========================================
+ Coverage   82.14%   82.24%   +0.09%     
==========================================
  Files          82       83       +1     
  Lines        6351     6347       -4     
==========================================
+ Hits         5217     5220       +3     
+ Misses       1134     1127       -7     
Flag Coverage Δ
oonidata 77.36% <66.66%> (-0.08%) ⬇️
oonipipeline 84.34% <72.32%> (+0.17%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@hellais
Copy link
Member Author

hellais commented Sep 11, 2024

In the end the table migrations were run using the following manual migration script (click show to expand)

show
    CREATE TABLE obs_web_new (
     measurement_uid String,
     observation_idx UInt16,
     input Nullable(String),
     report_id String,
     measurement_start_time Datetime64(3, 'UTC'),
     software_name String,
     software_version String,
     test_name String,
     test_version String,
     bucket_date String,
     probe_asn UInt32,
     probe_cc String,
     probe_as_org_name String,
     probe_as_cc String,
     probe_as_name String,
     network_type String,
     platform String,
     origin String,
     engine_name String,
     engine_version String,
     architecture String,
     resolver_ip String,
     resolver_asn UInt32,
     resolver_cc String,
     resolver_as_org_name String,
     resolver_as_cc String,
     resolver_is_scrubbed UInt8,
     resolver_asn_probe UInt32,
     resolver_as_org_name_probe String,
     created_at Nullable(Datetime('UTC')),
     target_id Nullable(String),
     hostname Nullable(String),
     transaction_id Nullable(UInt16),
     ip Nullable(String),
     port Nullable(UInt16),
     ip_asn Nullable(UInt32),
     ip_as_org_name Nullable(String),
     ip_as_cc Nullable(String),
     ip_cc Nullable(String),
     ip_is_bogon Nullable(UInt8),
     dns_query_type Nullable(String),
     dns_failure Nullable(String),
     dns_engine Nullable(String),
     dns_engine_resolver_address Nullable(String),
     dns_answer_type Nullable(String),
     dns_answer Nullable(String),
     dns_answer_asn Nullable(UInt32),
     dns_answer_as_org_name Nullable(String),
     dns_t Nullable(Float64),
     tcp_failure Nullable(String),
     tcp_success Nullable(UInt8),
     tcp_t Nullable(Float64),
     tls_failure Nullable(String),
     tls_server_name Nullable(String),
     tls_version Nullable(String),
     tls_cipher_suite Nullable(String),
     tls_is_certificate_valid Nullable(UInt8),
     tls_end_entity_certificate_fingerprint Nullable(String),
     tls_end_entity_certificate_subject Nullable(String),
     tls_end_entity_certificate_subject_common_name Nullable(String),
     tls_end_entity_certificate_issuer Nullable(String),
     tls_end_entity_certificate_issuer_common_name Nullable(String),
     tls_end_entity_certificate_san_list Array(String),
     tls_end_entity_certificate_not_valid_after Nullable(Datetime64(3, 'UTC')),
     tls_end_entity_certificate_not_valid_before Nullable(Datetime64(3, 'UTC')),
     tls_certificate_chain_length Nullable(UInt16),
     tls_certificate_chain_fingerprints Array(String),
     tls_handshake_read_count Nullable(UInt16),
     tls_handshake_write_count Nullable(UInt16),
     tls_handshake_read_bytes Nullable(UInt32),
     tls_handshake_write_bytes Nullable(UInt32),
     tls_handshake_last_operation Nullable(String),
     tls_handshake_time Nullable(Float64),
     tls_t Nullable(Float64),
     http_request_url Nullable(String),
     http_network Nullable(String),
     http_alpn Nullable(String),
     http_failure Nullable(String),
     http_request_body_length Nullable(UInt32),
     http_request_method Nullable(String),
     http_runtime Nullable(Float64),
     http_response_body_length Nullable(Int32),
     http_response_body_is_truncated Nullable(UInt8),
     http_response_body_sha1 Nullable(String),
     http_response_status_code Nullable(UInt16),
     http_response_header_location Nullable(String),
     http_response_header_server Nullable(String),
     http_request_redirect_from Nullable(String),
     http_request_body_is_truncated Nullable(UInt8),
     http_t Nullable(Float64),
     probe_analysis Nullable(String)
    )
    ENGINE = ReplacingMergeTree
    -- Partition by the month of the bucket
    PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
    PRIMARY KEY (measurement_uid, observation_idx)
    ORDER BY (measurement_uid, observation_idx, measurement_start_time, probe_cc, probe_asn)
    SETTINGS index_granularity = 8192;
INSERT INTO obs_web_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
target_id,
hostname,
transaction_id,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_query_type,
dns_failure,
dns_engine,
dns_engine_resolver_address,
dns_answer_type,
dns_answer,
dns_answer_asn,
dns_answer_as_org_name,
dns_t,
tcp_failure,
tcp_success,
tcp_t,
tls_failure,
tls_server_name,
tls_version,
tls_cipher_suite,
tls_is_certificate_valid,
tls_end_entity_certificate_fingerprint,
tls_end_entity_certificate_subject,
tls_end_entity_certificate_subject_common_name,
tls_end_entity_certificate_issuer,
tls_end_entity_certificate_issuer_common_name,
tls_end_entity_certificate_san_list,
tls_end_entity_certificate_not_valid_after,
tls_end_entity_certificate_not_valid_before,
tls_certificate_chain_length,
tls_certificate_chain_fingerprints,
tls_handshake_read_count,
tls_handshake_write_count,
tls_handshake_read_bytes,
tls_handshake_write_bytes,
tls_handshake_last_operation,
tls_handshake_time,
tls_t,
http_request_url,
http_network,
http_alpn,
http_failure,
http_request_body_length,
http_request_method,
http_runtime,
http_response_body_length,
http_response_body_is_truncated,
http_response_body_sha1,
http_response_status_code,
http_response_header_location,
http_response_header_server,
http_request_redirect_from,
http_request_body_is_truncated,
http_t,
probe_analysis
)
SELECT
measurement_uid,
IF(
    observation_id = '', 1,
    toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
target_id,
hostname,
transaction_id,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_query_type,
dns_failure,
dns_engine,
dns_engine_resolver_address,
dns_answer_type,
dns_answer,
dns_answer_asn,
dns_answer_as_org_name,
dns_t,
tcp_failure,
tcp_success,
tcp_t,
tls_failure,
tls_server_name,
tls_version,
tls_cipher_suite,
tls_is_certificate_valid,
tls_end_entity_certificate_fingerprint,
tls_end_entity_certificate_subject,
tls_end_entity_certificate_subject_common_name,
tls_end_entity_certificate_issuer,
tls_end_entity_certificate_issuer_common_name,
tls_end_entity_certificate_san_list,
tls_end_entity_certificate_not_valid_after,
tls_end_entity_certificate_not_valid_before,
tls_certificate_chain_length,
tls_certificate_chain_fingerprints,
tls_handshake_read_count,
tls_handshake_write_count,
tls_handshake_read_bytes,
tls_handshake_write_bytes,
tls_handshake_last_operation,
tls_handshake_time,
tls_t,
http_request_url,
http_network,
http_alpn,
http_failure,
http_request_body_length,
http_request_method,
http_runtime,
http_response_body_length,
http_response_body_is_truncated,
http_response_body_sha1,
http_response_status_code,
http_response_header_location,
http_response_header_server,
http_request_redirect_from,
http_request_body_is_truncated,
http_t,
probe_analysis
FROM obs_web

CREATE TABLE obs_http_middlebox_new (
measurement_uid String,
observation_idx UInt16,
input Nullable(String),
report_id String,
measurement_start_time Datetime64(3, 'UTC'),
software_name String,
software_version String,
test_name String,
test_version String,
bucket_date String,
probe_asn UInt32,
probe_cc String,
probe_as_org_name String,
probe_as_cc String,
probe_as_name String,
network_type String,
platform String,
origin String,
engine_name String,
engine_version String,
architecture String,
resolver_ip String,
resolver_asn UInt32,
resolver_cc String,
resolver_as_org_name String,
resolver_as_cc String,
resolver_is_scrubbed UInt8,
resolver_asn_probe UInt32,
resolver_as_org_name_probe String,
created_at Nullable(Datetime64(3, 'UTC')),
hirl_sent_0 Nullable(String),
hirl_sent_1 Nullable(String),
hirl_sent_2 Nullable(String),
hirl_sent_3 Nullable(String),
hirl_sent_4 Nullable(String),
hirl_received_0 Nullable(String),
hirl_received_1 Nullable(String),
hirl_received_2 Nullable(String),
hirl_received_3 Nullable(String),
hirl_received_4 Nullable(String),
hirl_failure Nullable(String),
hirl_success Nullable(UInt8),
hfm_diff Nullable(String),
hfm_failure Nullable(String),
hfm_success Nullable(UInt8)
)
ENGINE = ReplacingMergeTree
PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
PRIMARY KEY (measurement_uid, observation_idx)
ORDER BY (measurement_uid, observation_idx, measurement_start_time)
SETTINGS index_granularity = 8192;

INSERT INTO obs_http_middlebox_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
hirl_sent_0,
hirl_sent_1,
hirl_sent_2,
hirl_sent_3,
hirl_sent_4,
hirl_received_0,
hirl_received_1,
hirl_received_2,
hirl_received_3,
hirl_received_4,
hirl_failure,
hirl_success,
hfm_diff,
hfm_failure,
hfm_success
)
SELECT
measurement_uid,
IF(
observation_id = '', 1,
toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
hirl_sent_0,
hirl_sent_1,
hirl_sent_2,
hirl_sent_3,
hirl_sent_4,
hirl_received_0,
hirl_received_1,
hirl_received_2,
hirl_received_3,
hirl_received_4,
hirl_failure,
hirl_success,
hfm_diff,
hfm_failure,
hfm_success
FROM obs_http_middlebox

CREATE TABLE obs_web_ctrl_new (
measurement_uid String,
observation_idx UInt16,
input Nullable(String),
report_id String,
measurement_start_time Datetime64(3, 'UTC'),
software_name String,
software_version String,
test_name String,
test_version String,
bucket_date String,
hostname String,
created_at Nullable(Datetime64(3, 'UTC')),
ip String,
port Nullable(UInt16),
ip_asn Nullable(UInt32),
ip_as_org_name Nullable(String),
ip_as_cc Nullable(String),
ip_cc Nullable(String),
ip_is_bogon Nullable(UInt8),
dns_failure Nullable(String),
dns_success Nullable(UInt8),
tcp_failure Nullable(String),
tcp_success Nullable(UInt8),
tls_failure Nullable(String),
tls_success Nullable(UInt8),
tls_server_name Nullable(String),
http_request_url Nullable(String),
http_failure Nullable(String),
http_success Nullable(UInt8),
http_response_body_length Nullable(Int32)
)
ENGINE = ReplacingMergeTree
PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
PRIMARY KEY (measurement_uid, observation_idx)
ORDER BY (measurement_uid, observation_idx, measurement_start_time, hostname)
SETTINGS index_granularity = 8192;

INSERT INTO obs_web_ctrl_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
hostname,
bucket_date,
created_at,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_failure,
dns_success,
tcp_failure,
tcp_success,
tls_failure,
tls_success,
tls_server_name,
http_request_url,
http_failure,
http_success,
http_response_body_length
)
SELECT
measurement_uid,
IF(
observation_id = '', 1,
toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
hostname,
bucket_date,
created_at,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_failure,
dns_success,
tcp_failure,
tcp_success,
tls_failure,
tls_success,
tls_server_name,
http_request_url,
http_failure,
http_success,
http_response_body_length
FROM obs_web_ctrl

@hellais hellais marked this pull request as ready for review September 13, 2024 13:30
@hellais hellais changed the title Replace observation_id with observation_idx Refactor of schema for pipeline Sep 16, 2024
@hellais hellais merged commit 18b53ab into main Sep 16, 2024
8 of 9 checks passed
@hellais hellais deleted the feature/5.0-simple branch September 16, 2024 19:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant