# -*- coding: utf-8 -*- """ Program to ============================================================ Language: Python Authors ------- Bethan Harris Assimila c/o University of Reading bethan.harris@reading.ac.uk Ross N Bannister National Centre for Earth Observation University of Reading Modification history -------------------- 08/05/2013 Created first version BH 08/05/2013 Modified/Tidied up for second version BH ============================================================ NOTES: The model used here is very crude. Play around with the parameters to see what the effects of changing different errors are and the effects of different model variables. Use the email address above for correspondance relating to this code """ ######################################################################## ## IMPORT LIBRARIES ######################################################################## import numpy as np import random import csv import matplotlib.pyplot as plt ######################################################################## ## OBJECT DEFINITIONS ######################################################################## #----------------------------------------------------------------------# #-- MODEL -------------------------------------------------------------# #----------------------------------------------------------------------# class RadModel: ''' This is the model used to calculate solar radiation and it's absorption and re-emission by the ground and atmosphere. ''' def __init__(self, _timestep): self.timestep = _timestep # Start by calling the method which sets the variables self.SetVars() def SetVars(self,definedVars={}): #define empty dictionary for variables self.vars = {} # Set the parameters/variables to their defaul value or a new one self.vars["SolarCons"] = 1361.0 #Solar Constant, assuming Earth's orbit is circular self.vars["Mass_g"] = 500.0 #Mass of the ground per unit area self.vars["Mass_a"] = 50.0 #Mass of atmospheric layer per unit area self.vars["HeatCapacity_g"]= 1000.0 #Heat capacity of the ground self.vars["HeatCapacity_a"]= 1000.0 #Heat capacity of the air self.vars["Abs_g_s"] = 0.85 #Absorbtivity of the ground in the shortwave (from sun) self.vars["Abs_g_l"] = 0.84 #Absorbtivity of the ground in the longwave (from ground) self.vars["Abs_a_s"] = 0.15 #Absorbtivity of the air in the shortwave (from sun) self.vars["Abs_a_l"] = 0.82 #Absorbtivity of the air in the longwave (from ground) self.vars["lat_deg"] = 51.5 #Latitude self.vars["Dec_solar_deg"] = 17.0 #Declination (varies at different times of year, this is for February) self.vars["SB"] = 5.67E-8 #Stefan Boltzman constant self.vars["deg2rad"] = 0.017453293 #to convert from degrees into radians # Overwrite these default parameters/variables with provided values for v in definedVars: self.vars[v]=definedVars[v] def Step(self,T_prev_g, T_prev_a, time): # make sure that T_prev_g and T_prev_a are floats to avoid matrix # multiplication issues T_prev_g = float(T_prev_g) T_prev_a = float(T_prev_a) # Take another timestep and calculate temperatures for the air and ground # Calculate the hour angle (convert to hours, origin at 12Z, in radians) hour_angle = (time/3600 - 12.0)*(15.0*self.vars["deg2rad"]) #calculate incoming solar radiation Sin_TOA = self.vars["SolarCons"] * (np.cos(hour_angle)*np.cos(self.vars["lat_deg"]*self.vars["deg2rad"])*np.cos(self.vars["Dec_solar_deg"]*self.vars["deg2rad"]) - np.sin(self.vars["lat_deg"]*self.vars["deg2rad"]) * np.sin(self.vars["Dec_solar_deg"]*self.vars["deg2rad"])) # if the sun is below the horizon then Sin_TOA is zero. if Sin_TOA <= 0.0: Sin_TOA = 0.0 # calculate Longwave emission from the concrete Sout_g_l = self.vars["SB"] * T_prev_g**4.0 # calculate Longwave emission from the air Sout_a_l = self.vars["SB"] * T_prev_a**4.0 # calculate Concrete's absorbtion of longwave radiation emitted by the air Sin_g_l = self.vars["Abs_g_l"] * Sout_a_l # calculate Concrete's absorption of shortwave radiation emitted by the sun Sin_g_s = self.vars["Abs_g_s"] * Sin_TOA # calculate Air's absorbtion of longwave radiation emitted by the concrete Sin_a_l = self.vars["Abs_a_l"] * Sout_g_l # calculate Air's absorption of shortwave radiation emitted by the sun Sin_a_s = self.vars["Abs_a_s"] * Sin_TOA # The Euler Step of calculation # a) Ground's temperature: self.T_g=T_prev_g + timestep * (Sin_g_l + Sin_g_s - Sout_g_l) / (self.vars["Mass_g"] * self.vars["HeatCapacity_g"]) # a) Air's temperature: self.T_a=T_prev_a + timestep * (Sin_a_l + Sin_a_s - Sout_a_l) / (self.vars["Mass_a"] * self.vars["HeatCapacity_a"]) #----------------------------------------------------------------------# #-- OBSERVATIONS ------------------------------------------------------# #----------------------------------------------------------------------# class Observations: ''' Object to access the observations taken from the University of Reading Atmospheric Observatory. ''' def __init__(self, pathname, lineSkip=0): ''' pathname = location of the csv file to be used lineskip (optional) = the number of lines in the header to be passed over before reading can begin ''' file = open(pathname,'rb') self.csvData = csv.reader(file) # loop over the file header and discard that info skips = np.arange(0,lineSkip) for s in skips: self.csvData.next() def readObs(self): line = self.csvData.next() ''' PULL OUT THE DATA REQUIRED Indices and formats are specific to the CSV file of observations Observations are assumed to be degrees Celcius and are transformed into degrees Kelvin ''' #read in the time from the CSV file (specific to this file) timestring = line[0] hours = int(timestring[0:2]) minutes = int(timestring[3:5]) seconds = int(timestring[6:8]) # convert into time in seconds self.time = ((hours * 60.0) + minutes) *60.0 + seconds # read in the weighted temperatures columns temp_g = float(line[4])+273.25 temp_a = float(line[5])+273.25 #add temps into matrix attribute of the object: self.temp = np.matrix([[temp_g],[temp_a]]) ######################################################################## ## DEFINITIONS ######################################################################## #-- EMPTY ARRAY DEFINITIONS -------------------------------------------# T_ground_mod = np.array([]) # Ground temperature timeseries (Kelvin): model output T_air_mod = np.array([]) # Atmos temperature timeseries (Kelvin): model output T_ground_obs = np.array([]) # Ground temperature timeseries (Kelvin): observed T_air_obs = np.array([]) # Atmos temperature timeseries (Kelvin): observed T_ground_kal = np.array([]) # Ground temperature timeseries (Kelvin): Kalman filter output T_air_kal = np.array([]) # Atmos temperature timeseries (Kelvin): Kalman filter output P_ground_mod = np.array([]) # Ground temperature covariance timeseries (Kelvin): model output P_air_mod = np.array([]) # Atmos temperature covariance timeseries (Kelvin): model output P_ground_obs = np.array([]) # Ground temperature covariance timeseries (Kelvin): observed P_air_obs = np.array([]) # Atmos temperature covariance timeseries (Kelvin): observed P_ground_kal = np.array([]) # Ground temperature covariance timeseries (Kelvin): Kalman filter output P_air_kal = np.array([]) # Atmos temperature covariance timeseries (Kelvin): Kalman filter output time_out = np.array([]) # Time timeseries (seconds) T_g_perturbed = np.array([]) # Array to hold perturbed ground temperature values (Kelvin) T_a_perturbed = np.array([]) # Array to hold perturbed air temperature values (Kelvin) perturbedVars = {} #-- DEFINE FIXED VALUES -----------------------------------------------# timestep = 300.0 # Four minutes (seconds): To coincide with observation frequency end_time = 24.0*3600.0 # One day (seconds) perturbation= 0.003 # Maximum perturbation to make to model variables (fraction) when finding process error matrix, Q variables = ("SolarCons","Mass_g","Mass_a","HeatCapacity_g","HeatCapacity_a","Abs_g_s","Abs_g_l","Abs_a_s","Abs_a_l") # catalogue of variable names obsFilename = 'Atmos_obs_data_02Feb2013.csv' #location of observations csv file delta = 0.1 # Small perturbation to temp (Kelvin): for linearising state transition matrix Q = np.matrix(np.zeros(shape=(2,2))) # A blank estimated process error matrix. H = np.matrix([(1, 0),(0, 1)]) # Observation Matrix R = np.matrix([(1.0, 0.0),(0.0, 1.0)]) # Estimated measurement error matrix I = np.matrix(np.identity(2)) # An identity matrix #-- CREATE NEW OBJECTS ------------------------------------------------# RM_independent = RadModel(timestep) # An independent model run isolated from the kalman filtering RM = RadModel(timestep) # The un-perturbed non-linear model RM_Perturbed_g = RadModel(timestep) # Model for perturbed ground temperature: for linearising state transition matrix RM_Perturbed_a = RadModel(timestep) # Model for perturbed air temperature: for linearising state transition matrix OB = Observations(obsFilename,2) #Observations object #-- DEFINE INITIAL CONDITIONS -----------------------------------------# time = 240.0 # Initial time (start 4 minutes after midnight) T_g = 275.0 # Initial Ground temperature T_a = 275.0 # initial air temperature x_previous = np.matrix([[T_g],[T_a]]) # Temperatures combined into matrix P_previous = np.matrix([(1, 0),(0, 1)]) #Estimated initial error covariance matrix RM_independent.T_g = T_g RM_independent.T_a = T_a ######################################################################## ## CALCULATIONS ######################################################################## #-- FIND ESTIMATED PROCESS MATRIX (Q)----------------------------------# # change our defined var 1000 times to simulate initial # estimate error for n in range(999): # Create a dictionary of perturbed model parameters for v in variables: perturbedVars[v]=RM.vars[v]*(1+random.uniform(-perturbation,perturbation)) # Set the model to have these perturbed prameters RM.SetVars(definedVars=perturbedVars) # Perturb input temperatures by the same perturbation x_previous_perturbed=x_previous*(1+random.uniform(-perturbation,perturbation)) #Step through the model with perturbed input temperatures and parameters RM.Step(x_previous_perturbed[0],x_previous_perturbed[1],time) #append these temperatures into vectors T_g_perturbed = np.append(T_g_perturbed, RM.T_g) T_a_perturbed = np.append(T_a_perturbed, RM.T_a) #Reset the variables to default values RM.SetVars() # Update Q matrix with the variance of these values Q[0,0] = (np.std(T_g_perturbed))**2 Q[1,1] = (np.std(T_a_perturbed))**2 # !! OPtion to fix Q at some known value.. #Q[0,0] = 0.1 #Q[1,1] = 0.1 #-- BEGIN ITERATING OVER TIMESTEPS ------------------------------------# while time