-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
da3dc64
commit 7ab24bf
Showing
1 changed file
with
73 additions
and
74 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,19 +4,19 @@ | |
Files to Copernicus must be on netcdf format, one file for each day of data. | ||
datasetname, csv and xml is sent to buildnetcdfs. | ||
datasetname, csv and xml is sent to buildnetcdfs. | ||
This splits the csv into day long segments and sends the segments to makenetcdf_ | ||
makenetcdf_ creates the netcdf-file and appends it together with the associated date | ||
to results, which is returned by buildnetcdfs. | ||
makenetcdf_ creates the netcdf-file and appends it together with the associated date | ||
to results, which is returned by buildnetcdfs. | ||
results is on the format [[date,bytes][date,bytes]]. | ||
Files are sent to Copernicus by FTP. | ||
Files are sent to Copernicus by FTP. | ||
Each put-request must include filename[expocode], bytes, destination. | ||
The Copernicus FTP requires an Index file and a DNT file describing all | ||
files uploaded to the server to complete the ingestion. | ||
The Copernicus FTP requires an Index file and a DNT file describing all | ||
files uploaded to the server to complete the ingestion. | ||
The index file reflects all the files in the FTP folder. | ||
The DNT file triggers the ingestion. | ||
The DNT file triggers the ingestion. | ||
A DNT file must be submitted for the index-file as well. | ||
Example of DNT file format provided by mail from [email protected] 2019-03-07 | ||
|
@@ -31,27 +31,27 @@ | |
</file> | ||
</dataset> | ||
</delivery> | ||
</delivery> | ||
Example of index file format provided by [email protected] 2019-03-06 | ||
# Title : Carbon in-situ observations catalog | ||
# Description : catalog of available in-situ observations per platform. | ||
# Project : Copernicus | ||
# Format version : 1.0 | ||
# Date of update : 20190305080103 | ||
# catalog_id,file_name,geospatial_lat_min,geospatial_lat_max,geospatial_lon_min,geospatial_lon_max,time_coverage_start,time_coverage_end,provider,date_update,data_mode,parameters | ||
# Title : Carbon in-situ observations catalog | ||
# Description : catalog of available in-situ observations per platform. | ||
# Project : Copernicus | ||
# Format version : 1.0 | ||
# Date of update : 20190305080103 | ||
# catalog_id,file_name,geospatial_lat_min,geospatial_lat_max,geospatial_lon_min,geospatial_lon_max,time_coverage_start,time_coverage_end,provider,date_update,data_mode,parameters | ||
COP-GLOBAL-01,ftp://nrt.cmems-du.eu/Core/INSITU_GLO_CARBON_NRT_OBSERVATIONS_013_049/nrt/latest/20190221/GL_LATEST_PR_BA_7JXZ_20190221.nc,19.486,19.486,-176.568,-176.568,2019-02-21T17:50:00Z,2019-02-21T17:50:00Z,Unknown institution,2019-02-24T04:10:11Z,R,DEPH TEMP | ||
''' | ||
import logging | ||
import logging | ||
import pysftp | ||
import ftplib | ||
import ftputil | ||
import ftputil | ||
import os | ||
import sys | ||
import hashlib | ||
import datetime | ||
import netCDF4 | ||
from py_func.cmems_converter import buildnetcdfs | ||
from py_func.cmems_converter import buildnetcdfs | ||
|
||
import xml.etree.ElementTree as ET | ||
import sqlite3 as lite | ||
|
@@ -78,24 +78,24 @@ | |
|
||
#con = lite.connect('uploaded_files.db') | ||
#cur = con.cursor() | ||
#cur.execute(''' CREATE TABLE uploads | ||
#cur.execute(''' CREATE TABLE uploads | ||
# (date text, filename text, time_coverage_start text, time_coverage_end text, hashsum text)''') | ||
|
||
|
||
def send_to_copernicus( | ||
filename,dataset_zip,dataset_name,destination,ftp_config,server, | ||
delete_file=False): | ||
''' | ||
connects to copernicus server, creates and uploads netCDF4-files and | ||
''' | ||
connects to copernicus server, creates and uploads netCDF4-files and | ||
corresponding dnt-file | ||
csv_file: file retrieved from QuinCe | ||
dataset_name: dataset name | ||
ftp_config contains login-information, server-information and filepaths. | ||
server denotes if the file should be uploaded to the 'near real time'-server | ||
or the 'multi year'-server. | ||
server denotes if the file should be uploaded to the 'near real time'-server | ||
or the 'multi year'-server. | ||
delete_file indicates that the file should be deleted from the Copernicus server, | ||
triggers the sending of a DNT 'delete'-file. | ||
triggers the sending of a DNT 'delete'-file. | ||
returns: upload results | ||
|
@@ -108,9 +108,9 @@ def send_to_copernicus( | |
logging.info( | ||
'Creating netcdf-files based on {:s} to send to Copernicus' | ||
.format(filename)) | ||
|
||
folder_date = (datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) | ||
try: | ||
try: | ||
os.mkdir(folder_date) | ||
except: pass | ||
uploaded_to_ftp = True | ||
|
@@ -119,7 +119,7 @@ def send_to_copernicus( | |
results = {} | ||
hashsum_no_date={} | ||
files_uploaded = False | ||
with open(csv_file) as f: | ||
with open(csv_file) as f: | ||
csv = f.read() | ||
nc_files = buildnetcdfs(dataset_name,csv) | ||
|
||
|
@@ -135,18 +135,18 @@ def send_to_copernicus( | |
|
||
for nc_file in nc_files: #nc_file = [filename, bytes] | ||
nc_filename = nc_file[0] | ||
nc_content = nc_file[1] | ||
nc_filepath = folder_date + '/' + nc_filename + '.nc' | ||
nc_content = nc_file[1] | ||
nc_filepath = folder_date + '/' + nc_filename + '.nc' | ||
with open(nc_filepath,'wb') as f: f.write(nc_content) | ||
#with open(folder_date+'/' + nc_nan,'wb') as f: f.write(nc_content) | ||
|
||
hashsum_no_date[nc_filename] = get_netCDF_hashsum_no_date(nc_filepath) | ||
|
||
try: | ||
with open(log_latest,'r') as log: | ||
with open(log_latest,'r') as log: | ||
uploads_latest = eval(log.read()) | ||
|
||
if nc_filename in uploads_latest: | ||
if nc_filename in uploads_latest: | ||
logging.debug('filename already uploaded') | ||
logging.debug('old checksum: ' + str(uploads_latest[nc_filename])) | ||
logging.debug('new checksum: ' + str(hashsum_no_date[nc_filename])) | ||
|
@@ -166,25 +166,25 @@ def send_to_copernicus( | |
|
||
upload_result, ftp_filepath, start_upload_time, stop_upload_time \ | ||
= upload_to_ftp(ftp,ftp_config,nc_filename, nc_filepath) | ||
if upload_result is 0: | ||
|
||
if upload_result is 0: | ||
results[nc_filename]='netCDF upload ok ' | ||
files_uploaded = True | ||
else: | ||
else: | ||
results[nc_filename]='no netCDF uploaded' | ||
uploaded_to_ftp = False | ||
uploaded_to_ftp = False | ||
|
||
if upload_result is 0 or delete_file == True: | ||
ftp_filepath = ftp_filepath.split('NRT_201904/')[-1] | ||
#dnt_folder = ftp_config['Copernicus']['dnt_dir'] | ||
#dnt_folder = ftp_config['Copernicus']['dnt_dir'] | ||
dnt_filename, dnt_filepath = DNT_create( | ||
ftp_filepath, start_upload_time, stop_upload_time, folder_date, | ||
delete_file) | ||
# If a DNT file with the same timestamp/filename already exists, | ||
|
||
# If a DNT file with the same timestamp/filename already exists, | ||
# create a new DNT-file. | ||
dnt_list = ftp.listdir(dnt_dir) | ||
if dnt_filename in dnt_list: | ||
if dnt_filename in dnt_list: | ||
logging.debug('Duplicate DNT filename occured') | ||
dnt_filename, dnt_filepath = \ | ||
DNT_create(ftp_filepath,start_upload_time, | ||
|
@@ -193,10 +193,10 @@ def send_to_copernicus( | |
dnt_upload_list += [[nc_filename, | ||
dnt_filepath, | ||
dnt_dir+'/'+dnt_filename]] | ||
elif upload_result is 2: | ||
|
||
elif upload_result is 2: | ||
results[nc_filename] = 'netCDF file exists' | ||
else: | ||
else: | ||
results[nc_filename] = 'netCDF: No response' | ||
uploaded_to_ftp = False | ||
|
||
|
@@ -205,14 +205,14 @@ def send_to_copernicus( | |
uploaded_to_ftp = False | ||
|
||
for nc_filename, dnt_source, dnt_target in dnt_upload_list: | ||
try: | ||
try: | ||
ftp.upload(dnt_source,dnt_target) | ||
results[nc_filename] += ', DNT upload ok' | ||
except: | ||
except: | ||
results[nc_filename] += ', DNT: No response' | ||
logging.debug('DNT: No response') | ||
|
||
# get response file and generate upload-log and report-log | ||
# get response file and generate upload-log and report-log | ||
if files_uploaded == True: | ||
upload_response_log, uploads_latest = evaluate_response_file( | ||
dnt_upload_list,ftp,folder_date, | ||
|
@@ -228,25 +228,25 @@ def send_to_copernicus( | |
if exists: | ||
with open(log_file,'a') as log: log.write(upload_response_log) | ||
else: | ||
with open(log_file,'w') as log: | ||
with open(log_file,'w') as log: | ||
log.write('upload_date, netCDF filename, DNT filename, DNT response \n') | ||
log.write(upload_response_log) | ||
with open(log_latest,'w') as log: | ||
|
||
with open(log_latest,'w') as log: | ||
json.dump(uploads_latest,log) | ||
if uploaded_to_ftp: | ||
|
||
if uploaded_to_ftp: | ||
return dataset_name + ': successfully uploaded to the CMEMS FTP server' | ||
else: | ||
else: | ||
return dataset_name + ': failed correct upload procedure to the CMEMS FTP server' | ||
|
||
|
||
def upload_to_ftp(ftp, ftp_config, filename, filepath): | ||
''' Uploads file with location 'filepath' to an ftp-server, | ||
server-location set by 'directory' parameter and config-file, | ||
''' Uploads file with location 'filepath' to an ftp-server, | ||
server-location set by 'directory' parameter and config-file, | ||
ftp is the ftp-connection | ||
returns | ||
returns | ||
upload_result: upload_ok or file_exists | ||
dest_filepath: target filepath on ftp-server | ||
start_upload_time and stop_upload_time: timestamps of upload process | ||
|
@@ -272,21 +272,21 @@ def upload_to_ftp(ftp, ftp_config, filename, filepath): | |
|
||
def DNT_create( | ||
filepath,start_upload_time,stop_upload_time,folder_date,delete_file=False): | ||
''' Generates delivery note for NetCDF file upload, | ||
''' Generates delivery note for NetCDF file upload, | ||
note needed by Copernicus in order to move .nc-file to public-ftp | ||
filepath is name of file that has been uploaded to the server | ||
start_upload_time and stop_upload_time are the timestamps associated with | ||
start_upload_time and stop_upload_time are the timestamps associated with | ||
file upload | ||
returns the filename and filepath of the dnt-file | ||
The parameter delete_file can be set to True or False. | ||
The parameter delete_file can be set to True or False. | ||
If set to True, the dnt file will request the deletion of filename. | ||
''' | ||
product_id = 'INSITU_GLO_CARBON_NRT_OBSERVATIONS_013_049' | ||
date = datetime.datetime.now().strftime(dnt_datetime_format) | ||
|
||
with open(folder_date +'/'+ filepath.split('/')[-1],'rb') as f: | ||
with open(folder_date +'/'+ filepath.split('/')[-1],'rb') as f: | ||
nc_bytes = f.read() | ||
|
||
dnt = ET.Element('delivery') | ||
|
@@ -315,7 +315,7 @@ def DNT_create( | |
dnt_file = product_id + '_P' + date + '.xml' | ||
dnt_filepath = folder_date + '/' + dnt_file | ||
|
||
with open(dnt_filepath,'wb') as xml: | ||
with open(dnt_filepath,'wb') as xml: | ||
xml_tree.write(xml,xml_declaration=True,method='xml') | ||
|
||
return dnt_file, dnt_filepath | ||
|
@@ -325,13 +325,13 @@ def index_file(ftp,dnt_dir,folder_date,upload_result): | |
''' | ||
Creates index file to be updated in CMEMS FTP-server | ||
Describes all files uploaded to the FTP folder during this session | ||
requires an ftp connection, the local folder for the current session | ||
requires an ftp connection, the local folder for the current session | ||
and the upload status of the files associated with this session. | ||
returns the success/failure status of the index-upload-procedure | ||
''' | ||
index_status = 'index upload ok' | ||
index_info = '' | ||
|
||
dir_list = ftp.listdir(index_dir+'/latest') | ||
|
||
date_header = datetime.datetime.now().strftime('%Y%m%d%H%M%S') | ||
|
@@ -369,30 +369,29 @@ def index_file(ftp,dnt_dir,folder_date,upload_result): | |
parameters += item +' ' | ||
parameters = parameters[:-1] #removes final space | ||
|
||
index_info += ('COP-GLOBAL-01' + ',' + (server_location + file) + ',' | ||
+ lat_min + ',' + lat_max + ',' + lon_min + ',' + lon_max | ||
+ ',' + time_start + ',' + time_end + ',' | ||
+ 'University of Bergen Geophysical Institute' + ',' | ||
index_info += ('COP-GLOBAL-01' + ',' + (server_location + file) + ',' | ||
+ lat_min + ',' + lat_max + ',' + lon_min + ',' + lon_max | ||
+ ',' + time_start + ',' + time_end + ',' | ||
+ 'University of Bergen Geophysical Institute' + ',' | ||
+ date_update + ',' + 'R' + ',' + parameters + '\n') | ||
|
||
index_latest = index_header + index_info | ||
index_filename = folder_date +'/index_latest.txt' | ||
with open(index_filename,'w') as f: f.write(index_latest) | ||
|
||
ftp_index_location = index_dir+'/index_latest.txt' | ||
logging.debug('index file:\n'+index_latest) | ||
|
||
try: | ||
try: | ||
start_upload_time = datetime.datetime.now().strftime(dnt_datetime_format) | ||
ftp.upload(index_filename, ftp_index_location) | ||
stop_upload_time = datetime.datetime.now().strftime(dnt_datetime_format) | ||
|
||
try: | ||
dnt_file, dnt_filepath = DNT_create( | ||
index_filename.split('/')[-1], | ||
index_latest.encode('utf-8'), | ||
start_upload_time, | ||
stop_upload_time, | ||
stop_upload_time, | ||
folder_date) | ||
|
||
ftp.upload(dnt_filepath,dnt_dir+'/'+dnt_file) | ||
|
@@ -409,7 +408,7 @@ def get_response(ftp,dnt_filename,folder_date): | |
Function to retrieve the status of any file uploaded to CMEMS server | ||
requires login information and the filename of the DNT associated with the upload. | ||
returns the string of the xml responsefile generated by the CMEMS server. | ||
returns the string of the xml responsefile generated by the CMEMS server. | ||
''' | ||
source = dnt_filename.split('.')[0].replace('DNT','DNT_response') + '_response.xml' | ||
target = folder_date + '/' + source.split('/')[-1] | ||
|
@@ -434,8 +433,8 @@ def evaluate_response_file(dnt_upload_list,ftp,folder_date,upload_response_log,u | |
try: | ||
cmems_response = get_response(ftp,dnt_target,folder_date) | ||
logging.debug('cmems response: ' + cmems_response) | ||
upload_response_log += ( | ||
folder_date + ',' + nc_filename + ',' | ||
upload_response_log += ( | ||
folder_date + ',' + nc_filename + ',' | ||
+ dnt_source + ',' + cmems_response + '\n' ) | ||
if 'Ingested="True"' in cmems_response: | ||
path = os.getcwd() | ||
|
@@ -452,7 +451,7 @@ def evaluate_response_file(dnt_upload_list,ftp,folder_date,upload_response_log,u | |
return upload_response_log, uploads_latest | ||
|
||
def get_netCDF_hashsum_no_date(nc_path): | ||
with open(nc_path,'rb') as f: | ||
with open(nc_path,'rb') as f: | ||
nc_bytes = f.read() | ||
hashsum_no_date = hashlib.md5(nc_bytes).hexdigest() | ||
return hashsum_no_date | ||
return hashsum_no_date |