import Nio
import pandas as pd
import os
import subprocess
import numpy as np
from scipy.interpolate import RectBivariateSpline
from netCDF4 import Dataset, date2num
from datetime import datetime, timedelta
from pycaps.util.make_proj_grids import read_arps_map_file, make_proj_grids
from multiprocessing import Pool
import warnings
import traceback
import argparse
[docs]def MRMS_main():
warnings.simplefilter("ignore")
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--start", required=True, help="Date to begin aggregation in YYYYMMDDHHMM format")
parser.add_argument("-e", "--end", required=True, help="Date to end aggregation in YYYYMMDDHHMM format")
parser.add_argument("-p", "--path", required=True, help="Path to MRMS GRIB2 files")
parser.add_argument("-o", "--out", required=True, help="Path to save MRMS netCDF4 files")
parser.add_argument("-m", "--map", required=True, help="File containing map coordinates for interpolation")
parser.add_argument("-v", "--var", required=True, help="Comma-separated list of variables (example: MESH_MAX_60min_00.50)")
parser.add_argument("-n", "--np", type=int, default=1, help="Number of processors to use")
args = parser.parse_args()
start_date = datetime.strptime(args.start, "%Y%m%d%H%M")
end_date = datetime.strptime(args.end, "%Y%m%d%H%M")
mrms_path = args.path
out_path = args.out
map_filename = args.map
variables = args.var.split(",")
num_procs = args.np
#start_date = datetime(2015, 9, 29)
#end_date = datetime(2015, 9, 30)
#mrms_path = "/sharp/djgagne/mrms/"
#out_path = "/sharp/djgagne/mrms_ncar/"
#map_filename = "/sharp/djgagne/ncar_map_grids/rt2015_grid.nc"
#variables = ["MESH_Max_60min_00.50", "RotationTrack60min_00.50", "RadarQualityIndex_00.00"]
#variables = ["MESH_Max_60min_00.50", "RadarQualityIndex_00.00"]
#num_procs = 20
if num_procs > 1:
pool = Pool(num_procs)
curr_date = start_date
while curr_date <= end_date:
for variable in variables:
pool.apply_async(interpolate_mrms_day, (curr_date, variable, mrms_path, map_filename, out_path))
curr_date += timedelta(days=1)
pool.close()
pool.join()
else:
curr_date = start_date
while curr_date <= end_date:
for variable in variables:
apply(interpolate_mrms_day, (curr_date, variable, mrms_path, map_filename, out_path))
curr_date += timedelta(days=1)
[docs]def load_map_coordinates(map_file):
"""
Loads map coordinates from netCDF or pickle file created by util.makeMapGrids.
:param map_file: Filename for the file containing coordinate information.
:returns: Latitude and longitude grids as numpy arrays.
"""
if map_file[-4:] == ".pkl":
map_data = cPickle.load(open(map_file))
lon = map_data['lon']
lat = map_data['lat']
else:
map_data = Dataset(map_file)
if "lon" in map_data.variables.keys():
lon = map_data.variables['lon'][:]
lat = map_data.variables['lat'][:]
else:
lon = map_data.variables["XLONG"][0]
lat = map_data.variables["XLAT"][0]
return lon, lat
[docs]def interpolate_mrms_day(start_date, variable, mrms_path, map_filename, out_path):
"""
For a given day, this module interpolates hourly MRMS data to a specified latitude and
longitude grid, and saves the interpolated grids to CF-compliant netCDF4 files.
Parameters
----------
start_date : datetime.datetime
Date of data being interpolated
variable : str
MRMS variable
mrms_path : str
Path to top-level directory of MRMS GRIB2 files
map_filename : str
Name of the map filename. Supports ARPS map file format and netCDF files containing latitude
and longitude variables
out_path : str
Path to location where interpolated netCDF4 files are saved.
"""
try:
print start_date, variable
end_date = start_date + timedelta(hours=23)
mrms = MRMSGrid(start_date, end_date, variable, mrms_path)
if mrms.data is not None:
if map_filename[-3:] == "map":
mapping_data = make_proj_grids(*read_arps_map_file(map_filename))
mrms.interpolate_to_netcdf(mapping_data['lon'], mapping_data['lat'], out_path)
else:
lon, lat = load_map_coordinates(map_filename)
mrms.interpolate_to_netcdf(lon, lat, out_path)
except Exception as e:
# This exception catches any errors when run in multiprocessing, prints the stack trace,
# and ends the process. Otherwise the process will stall.
print traceback.format_exc()
raise e
[docs]class MRMSGrid(object):
"""
MRMSGrid reads time series of MRMS grib2 files, interpolates them, and outputs them to netCDF4 format.
"""
def __init__(self, start_date, end_date, variable, path_start, freq="1H"):
self.start_date = start_date
self.end_date = end_date
self.variable = variable
self.path_start = path_start
self.freq = freq
self.data = None
self.all_dates = pd.DatetimeIndex(start=self.start_date, end=self.end_date, freq=self.freq)
self.loaded_dates = None
self.lon = None
self.lat = None
self.load_data()
[docs] def load_data(self):
"""
Loads data from MRMS GRIB2 files and handles compression duties if files are compressed.
"""
data = []
loaded_dates = []
loaded_indices = []
for t, timestamp in enumerate(self.all_dates):
date_str = timestamp.date().strftime("%Y%m%d")
full_path = self.path_start + date_str + "/"
if self.variable in os.listdir(full_path):
full_path += self.variable + "/"
data_files = sorted(os.listdir(full_path))
file_dates = pd.to_datetime([d.split("_")[-1][0:13] for d in data_files])
if timestamp in file_dates:
data_file = data_files[np.where(timestamp==file_dates)[0][0]]
print full_path + data_file
if data_file[-2:] == "gz":
subprocess.call(["gunzip", full_path + data_file])
file_obj = Nio.open_file(full_path + data_file[:-3])
else:
file_obj = Nio.open_file(full_path + data_file)
var_name = sorted(file_obj.variables.keys())[0]
data.append(file_obj.variables[var_name][:])
if self.lon is None:
self.lon = file_obj.variables["lon_0"][:]
# Translates longitude values from 0:360 to -180:180
if np.count_nonzero(self.lon > 180) > 0:
self.lon -= 360
self.lat = file_obj.variables["lat_0"][:]
file_obj.close()
if data_file[-2:] == "gz":
subprocess.call(["gzip", full_path + data_file[:-3]])
else:
subprocess.call(["gzip", full_path + data_file])
loaded_dates.append(timestamp)
loaded_indices.append(t)
if len(loaded_dates) > 0:
self.loaded_dates = pd.DatetimeIndex(loaded_dates)
self.data = np.ones((self.all_dates.shape[0], data[0].shape[0], data[0].shape[1])) * -9999
self.data[loaded_indices] = np.array(data)
[docs] def interpolate_grid(self, in_lon, in_lat):
"""
Interpolates MRMS data to a different grid using cubic bivariate splines
"""
out_data = np.zeros((self.data.shape[0], in_lon.shape[0], in_lon.shape[1]))
for d in range(self.data.shape[0]):
print "Loading ", d, self.variable, self.start_date
if self.data[d].max() > -999:
step = self.data[d]
step[step < 0] = 0
if self.lat[-1] < self.lat[0]:
spline = RectBivariateSpline(self.lat[::-1], self.lon, step[::-1], kx=3, ky=3)
else:
spline = RectBivariateSpline(self.lat, self.lon, step, kx=3, ky=3)
print "Evaluating", d, self.variable, self.start_date
flat_data = spline.ev(in_lat.ravel(), in_lon.ravel())
out_data[d] = flat_data.reshape(in_lon.shape)
del spline
else:
print d, " is missing"
out_data[d] = -9999
return out_data
[docs] def interpolate_to_netcdf(self, in_lon, in_lat, out_path, date_unit="seconds since 1970-01-01T00:00"):
"""
Calls the interpolation function and then saves the MRMS data to a netCDF file. It will also create
separate directories for each variable if they are not already available.
"""
out_data = self.interpolate_grid(in_lon, in_lat)
if not os.access(out_path + self.variable, os.R_OK):
try:
os.mkdir(out_path + self.variable)
except OSError:
print out_path + self.variable + " already created"
out_file = out_path + self.variable + "/" + "{0}_{1}_{2}.nc".format(self.variable,
self.start_date.strftime("%Y%m%d-%H:%M"),
self.end_date.strftime("%Y%m%d-%H:%M"))
out_obj = Dataset(out_file, "w")
out_obj.createDimension("time", out_data.shape[0])
out_obj.createDimension("y", out_data.shape[1])
out_obj.createDimension("x", out_data.shape[2])
data_var = out_obj.createVariable(self.variable, "f4", ("time", "y", "x"), zlib=True,
fill_value=-9999.0,
least_significant_digit=3)
data_var[:] = out_data
data_var.long_name = self.variable
data_var.coordinates = "latitude longitude"
if "MESH" in self.variable or "QPE" in self.variable:
data_var.units = "mm"
elif "Reflectivity" in self.variable:
data_var.units = "dBZ"
elif "Rotation" in self.variable:
data_var.units = "s-1"
else:
data_var.units = ""
out_lon = out_obj.createVariable("longitude", "f4", ("y", "x"), zlib=True)
out_lon[:] = in_lon
out_lon.units = "degrees_east"
out_lat = out_obj.createVariable("latitude", "f4", ("y", "x"), zlib=True)
out_lat[:] = in_lat
out_lat.units = "degrees_north"
dates = out_obj.createVariable("time", "i8", ("time",), zlib=True)
dates[:] = np.round(date2num(self.all_dates.to_pydatetime(), date_unit)).astype(np.int64)
dates.long_name = "Valid date"
dates.units = date_unit
out_obj.Conventions="CF-1.6"
out_obj.close()
return
if __name__ == "__main__":
MRMS_main()