forked from IntegriChain1/s3parq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request IntegriChain1#15 from schafrn/DC-18-extract-main-e…
…xecutor DC-18 #fastrack
- Loading branch information
Showing
25 changed files
with
1,739 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import paramiko | ||
import stat | ||
import os | ||
from typing import NamedTuple | ||
import re | ||
|
||
class FileDestination(NamedTuple): | ||
regex: str | ||
file_type: str | ||
|
||
|
||
class FileMover(): | ||
def __init__(self,secret): | ||
user = secret.user | ||
password = secret.password | ||
host = secret.host | ||
port = secret.port | ||
mode = secret.mode | ||
self.transport = paramiko.Transport(host, port) | ||
self.transport.connect(username=user, password=password) | ||
self.sftp = paramiko.SFTPClient.from_transport(self.transport) | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
self.sftp.close() | ||
self.transport.close() | ||
|
||
def get_file(self, remote_path: str, local_path: str): | ||
# Fetch file from remote | ||
# Set local file time to match remote for comparison to S3 modified time | ||
utime = self.sftp.stat(remote_path).st_mtime | ||
self.sftp.get(remote_path, local_path) | ||
os.utime(local_path, (utime,utime)) | ||
|
||
def get_file_type(self, filename, file_dest_map): | ||
# Check if file is matching to the prefix, otherwise don't move | ||
file_type = [x.file_type for x in file_dest_map if re.match(x.regex, filename)] | ||
if len(file_type) < 1: | ||
return 'dont_move' | ||
else: | ||
return file_type[0] | ||
|
||
def is_dir(self, remote_file) -> bool: | ||
# Return bool of if 'file' is a directory or not | ||
return stat.S_ISDIR(remote_file.st_mode) | ||
|
||
def list_files(self, sftp_prefix: str): | ||
# List all files on remote | ||
return self.sftp.listdir_attr(sftp_prefix) | ||
|
||
|
||
def get_files(tmp_dir:str,prefix: str, remote_path: str, secret): | ||
# Set file filtering | ||
files_dest = [FileDestination(f"^{prefix}.*$","do_move")] | ||
|
||
# Open SFTP connection | ||
with FileMover(secret=secret) as fm: | ||
file_list = fm.list_files(remote_path) | ||
for remote_file in file_list: | ||
# For every "file" in the remote list, make sure its not a directory and matches filters | ||
if not (fm.is_dir(remote_file)) and (fm.get_file_type(remote_file.filename, files_dest)!='dont_move'): | ||
remote_file_path = remote_path + "/" + remote_file.filename | ||
# Set file name to include the path, in case of duplicate file names in different locations | ||
local_file_name = remote_file_path.replace("/",".")[1:] | ||
local_file_path = os.path.join(tmp_dir, local_file_name) | ||
|
||
fm.get_file(remote_file_path, local_file_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from core.helpers import file_mover | ||
from core.models import configuration | ||
from core import secret, contract | ||
|
||
import os | ||
import tempfile | ||
|
||
class ExtractTransform(): | ||
|
||
def __init__(self, **kwargs) -> None: | ||
""" Performs the extraction to a given output contract. | ||
Valid kwargs: | ||
- env one of "dev", "prod", "uat" | ||
- transform a configuration contract instance | ||
- output_contract a contract instance | ||
""" | ||
self.REQUIRED_PARAMS = ('env','output_contract','transform') | ||
|
||
for attr in self.REQUIRED_PARAMS: | ||
self.__dict__[attr] = None | ||
|
||
for attr in self.REQUIRED_PARAMS: | ||
if attr in kwargs: | ||
setter = getattr(self, 'set_' + attr) | ||
setter(kwargs[attr]) | ||
|
||
|
||
def set_env(self,env:str)->None: | ||
if env in ('dev','prod','uat'): | ||
self.env = env | ||
else: | ||
raise ValueError(f'{env} is not a valid environment') | ||
|
||
def set_transform(self, transform: configuration.Transformation) -> None: | ||
self.transform = transform | ||
|
||
def set_output_contract(self, output_contract: contract) -> None: | ||
self.output_contract = output_contract | ||
|
||
|
||
def run(self): | ||
for config in self.transform.extract_configurations: | ||
# Set values from extract config | ||
remote_path = config.filesystem_path | ||
prefix = config.prefix | ||
secret_name = config.secret_name | ||
secret_type_of = config.secret_type_of | ||
|
||
# Fetch secret from secret contract | ||
# TODO: Currently configs made for FTP only, FTP type passed in directly | ||
source_secret = secret.Secret(name=secret_name,env=self.env,type_of=secret_type_of,mode="write") | ||
|
||
# Get files from remote and start pushing to s3 | ||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
file_mover.get_files(tmp_dir=tmp_dir,prefix=prefix,remote_path=remote_path,secret=source_secret) | ||
self.push_to_s3(tmp_dir, self.output_contract) | ||
|
||
def push_to_s3(self, tmp_dir: str, output_contract: contract)-> None: | ||
""" For a local file dir, push the file to s3 if it is newer or does not exist.""" | ||
self._validate_required_params() | ||
|
||
# For each local file, see (by the set metadata) if it needs to be pushed to S3 by the constraints | ||
for local_file in os.listdir(f"{tmp_dir}"): | ||
local_file_path = os.path.join(tmp_dir,local_file) | ||
local_file_modified_time = os.stat(os.path.join(tmp_dir,local_file)).st_mtime | ||
|
||
if (self._file_needs_update(output_contract=output_contract, | ||
local_file_path=local_file_path, | ||
local_file_modified_time=local_file_modified_time | ||
)): | ||
output_contract.publish_raw_file(local_file_path) | ||
|
||
def _file_needs_update(self,output_contract: contract,local_file_path: str,local_file_modified_time: str)-> None: | ||
""" Check if file needs to be pushed | ||
File is only considered to need to be pushed if it does not exist or has been modified since last push | ||
""" | ||
try: | ||
s3_last_modified = output_contract.get_raw_file_metadata(local_file_path)['Metadata']['source_modified_time'] | ||
if (float(s3_last_modified) < float(local_file_modified_time)): | ||
return True | ||
else: | ||
return False | ||
except: | ||
return True | ||
|
||
def _validate_required_params(self) -> bool: | ||
''' Checks that all required params are set ''' | ||
for param in self.REQUIRED_PARAMS: | ||
if param not in self.__dict__.keys(): | ||
raise ValueError(f'{param} is a required value not set for ExtractTransform.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
"""empty message | ||
Revision ID: c35c252fb0bc | ||
Revises: 7333d20cbb08 | ||
Create Date: 2019-01-29 13:00:53.459520 | ||
""" | ||
from alembic import op | ||
import sqlalchemy as sa | ||
|
||
|
||
# revision identifiers, used by Alembic. | ||
revision = 'c35c252fb0bc' | ||
down_revision = '7333d20cbb08' | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
conn = op.get_bind() | ||
conn.execute(" COMMENT ON COLUMN extract_configurations.secret_type_of IS 'represents the source type, eg. FTP, databse, S3 etc.';") | ||
|
||
|
||
def downgrade(): | ||
conn = op.get_bind() | ||
conn.execute(" COMMENT ON COLUMN extract_configurations.secret_type_of IS '';") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,3 +12,4 @@ psycopg2==2.7.6.1 | |
alembic==1.0.6 | ||
papermill==0.17.1 | ||
jupyter==1.0.0 | ||
paramiko |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.