Skip to content

Commit

Permalink
Merge pull request #122 from thetisproject/than_tracer
Browse files Browse the repository at this point in the history
Adding 2-D tracer transport in thetis
  • Loading branch information
thangel authored Feb 22, 2018
2 parents 086f153 + 68208a0 commit 0e9f313
Show file tree
Hide file tree
Showing 13 changed files with 1,248 additions and 11 deletions.
128 changes: 128 additions & 0 deletions test/tracerEq/test_consistency_2d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Tracer box in 2D
# ================
#
# Solves a standing wave in a rectangular basin using wave equation.
#
# This version uses a constant tracer to check local/global conservation of tracers.
#
# Initial condition for elevation corresponds to a standing wave.
# Time step and export interval are chosen based on theorethical
# oscillation frequency. Initial condition repeats every 20 exports.
#
#
# Thanasis Angeloudis based on Tuomas Karna test_consistency.py 15-01-2018

from thetis import *

def run_tracer_consistency(constant_c = True, **model_options):

t_cycle = 2000.0 # standing wave period
depth = 50.0 # average depth
lx = np.sqrt(9.81*depth)*t_cycle # wave length
ly = 3000.0
nx = 18
ny = 2
mesh2d = RectangleMesh(nx, ny, lx, ly)
tracer_value = 4.5
elev_amp = 2.0
# estimate of max advective velocity used to estimate time step
u_mag = Constant(1.0)

outputdir = 'outputs'

# bathymetry
p1_2d = FunctionSpace(mesh2d, 'CG', 1)
bathymetry_2d = Function(p1_2d, name='Bathymetry')
x_2d, y_2d = SpatialCoordinate(mesh2d)
# non-trivial bathymetry, to properly test 2d tracer conservation
bathymetry_2d.interpolate(depth + depth/10.*sin(x_2d/lx*pi))


# set time step, export interval and run duration
n_steps = 8
t_export = round(float(t_cycle/n_steps))
# for testing tracer conservation, we don't want to come back to the initial condition
t_end = 2.5*t_cycle

# create solver
solver_obj = solver2d.FlowSolver2d(mesh2d, bathymetry_2d)
options = solver_obj.options
options.use_nonlinear_equations = True
options.solve_tracer = True
options.use_limiter_for_tracers = not constant_c
options.simulation_export_time = t_export
options.simulation_end_time = t_end
options.horizontal_velocity_scale = Constant(u_mag)
options.check_volume_conservation_2d = True
options.check_tracer_conservation = True
options.check_tracer_overshoot = True
options.timestepper_type = 'CrankNicolson'
options.output_directory = outputdir
options.fields_to_export = ['uv_2d', 'elev_2d', 'tracer_2d']
options.update(model_options)

if not options.no_exports:
print_output('Exporting to {:}'.format(options.output_directory))

solver_obj.create_function_spaces()
elev_init = Function(solver_obj.function_spaces.H_2d)
elev_init.project(-elev_amp*cos(2*pi*x_2d/lx))

tracer_init2d = None
if options.solve_tracer and constant_c:
tracer_init2d = Function(solver_obj.function_spaces.Q_2d, name='initial tracer')
tracer_init2d.assign(tracer_value)
if options.solve_tracer and not constant_c:
tracer_init2d = Function(solver_obj.function_spaces.Q_2d, name='initial tracer')
tracer_l = 0
tracer_r = 30.0
tracer_init2d.interpolate(tracer_l + (tracer_r - tracer_l)*0.5*(1.0 + sign(x_2d - lx/4)))

solver_obj.assign_initial_conditions(elev=elev_init, tracer=tracer_init2d )
solver_obj.iterate()

# TODO do these checks every export ...
vol2d, vol2d_rerr = solver_obj.callbacks['export']['volume2d']()
assert vol2d_rerr < 1e-10, '2D volume is not conserved'
if options.solve_tracer:
tracer_int, tracer_int_rerr = solver_obj.callbacks['export']['tracer_2d mass']()
assert abs(tracer_int_rerr) < 1e-4, 'tracer is not conserved'
smin, smax, undershoot, overshoot = solver_obj.callbacks['export']['tracer_2d overshoot']()
max_abs_overshoot = max(abs(undershoot), abs(overshoot))
overshoot_tol = 1e-12
msg = 'Tracer overshoots are too large: {:}'.format(max_abs_overshoot)
assert max_abs_overshoot < overshoot_tol, msg



def test_const_tracer():
"""
Test CrankNicolson timeintegrator without slope limiters
Constant tracer, should remain constant
"""
run_tracer_consistency(constant_c= True,
use_nonlinear_equations=True,
solve_tracer=True,
use_limiter_for_tracers=False,
no_exports=True)



def test_nonconst_tracer():
"""
Test CrankNicolson timeintegrator with slope limiters
Non-trivial tracer, should see no overshoots and be conserved
"""
run_tracer_consistency(constant_c= False,
use_nonlinear_equations=True,
solve_tracer=True,
use_limiter_for_tracers=True,
no_exports=True)


if __name__ == '__main__':
run_tracer_consistency(constant_c= False,
use_nonlinear_equations=True,
solve_tracer=True,
use_limiter_for_tracers=False,
no_exports=False)
2 changes: 1 addition & 1 deletion test/tracerEq/test_h-advection_mes.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def export_func():
next_export_t += solverobj.options.simulation_export_time
iexport += 1

# project analytical solultion on high order mesh
# project analytical solution on high order mesh
t_const.assign(t)
salt_ana_ho.project(ana_salt_expr)
# compute L2 norm
Expand Down
198 changes: 198 additions & 0 deletions test/tracerEq/test_h-advection_mes_2d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""
Testing 2D horizontal advection of tracers
Tuomas Karna, edited by Athanasios Angeloudis
"""
from thetis import *
import numpy
from scipy import stats
import pytest


def run(refinement, **model_options):
print_output('--- running refinement {:}'.format(refinement))

# domain dimensions - channel in x-direction
lx = 15.0e3
ly = 6.0e3/refinement
area = lx*ly
depth = 40.0
u = 1.0

# mesh
nx = 6*refinement + 1
ny = 1 # constant -- channel
mesh2d = RectangleMesh(nx, ny, lx, ly)

# simulation run time
t_end = 3000.0
# initial time
t_init = 0.0
t_export = (t_end - t_init)/8.0

# outputs
outputdir = 'outputs'

# bathymetry
P1_2d = FunctionSpace(mesh2d, 'CG', 1)
bathymetry_2d = Function(P1_2d, name='Bathymetry')
bathymetry_2d.assign(depth)

solverobj = solver2d.FlowSolver2d(mesh2d, bathymetry_2d)
options = solverobj.options
options.use_nonlinear_equations = False
options.use_lax_friedrichs_velocity = True
options.lax_friedrichs_velocity_scaling_factor = Constant(1.0)
options.use_lax_friedrichs_tracer = False
options.horizontal_velocity_scale = Constant(abs(u))
options.no_exports = True
options.output_directory = outputdir
options.simulation_end_time = t_end
options.simulation_export_time = t_export
options.solve_tracer = True
options.use_limiter_for_tracers = True
options.fields_to_export = ['tracer_2d']
options.update(model_options)
uv_expr = as_vector((u, 0))
bnd_salt_2d = {'value': Constant(0.0), 'uv': uv_expr}
bnd_uv_2d = {'uv': uv_expr}
solverobj.bnd_functions['tracer'] = {
1: bnd_salt_2d,
2: bnd_salt_2d,
}
solverobj.bnd_functions['momentum'] = {
1: bnd_uv_2d,
2: bnd_uv_2d,
}

solverobj.create_equations()

t = t_init # simulation time

x0 = 0.3*lx
sigma = 1600.
xy = SpatialCoordinate(solverobj.mesh2d)
t_const = Constant(t)
ana_tracer_expr = exp(-(xy[0] - x0 - u*t_const)**2/sigma**2)
tracer_ana = Function(solverobj.function_spaces.H_2d, name='tracer analytical')
uv_init = Function(solverobj.function_spaces.U_2d, name='initial uv')
uv_init.project(uv_expr)
solverobj.assign_initial_conditions(uv=uv_init, tracer=ana_tracer_expr)

# export analytical solution
if not options.no_exports:
out_tracer_ana = File(os.path.join(options.output_directory, 'tracer_ana.pvd'))

def export_func():
if not options.no_exports:
solverobj.export()
# update analytical solution to correct time
t_const.assign(t)
out_tracer_ana.write(tracer_ana.project(ana_tracer_expr))

# export initial conditions
export_func()

# custom time loop that solves tracer equation only
ti = solverobj.timestepper.timesteppers.tracer

i = 0
iexport = 1
next_export_t = t + solverobj.options.simulation_export_time
while t < t_end - 1e-8:
ti.advance(t)
t += solverobj.dt
i += 1
if t >= next_export_t - 1e-8:
print_output('{:3d} i={:5d} t={:8.2f} s salt={:8.2f}'.format(iexport, i, t, norm(solverobj.fields.tracer_2d)))
export_func()
next_export_t += solverobj.options.simulation_export_time
iexport += 1

# project analytical solution on high order mesh
t_const.assign(t)

# compute L2 norm
l2_err = errornorm(ana_tracer_expr, solverobj.fields.tracer_2d)/numpy.sqrt(area)
print_output('L2 error {:.12f}'.format(l2_err))

return l2_err


def run_convergence(ref_list, saveplot=False, **options):
"""Runs test for a list of refinements and computes error convergence rate"""

polynomial_degree = options.get('polynomial_degree', 1)
l2_err = []
for r in ref_list:
l2_err.append(run(r, **options))
x_log = numpy.log10(numpy.array(ref_list, dtype=float)**-1)
y_log = numpy.log10(numpy.array(l2_err))
setup_name = 'h-diffusion'

def check_convergence(x_log, y_log, expected_slope, field_str, saveplot):
slope_rtol = 0.20
slope, intercept, r_value, p_value, std_err = stats.linregress(x_log, y_log)
if saveplot:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(5, 5))
# plot points
ax.plot(x_log, y_log, 'k.')
x_min = x_log.min()
x_max = x_log.max()
offset = 0.05*(x_max - x_min)
npoints = 50
xx = numpy.linspace(x_min - offset, x_max + offset, npoints)
yy = intercept + slope*xx
# plot line
ax.plot(xx, yy, linestyle='--', linewidth=0.5, color='k')
ax.text(xx[2*int(npoints/3)], yy[2*int(npoints/3)], '{:4.2f}'.format(slope),
verticalalignment='top',
horizontalalignment='left')
ax.set_xlabel('log10(dx)')
ax.set_ylabel('log10(L2 error)')
ax.set_title(' '.join([setup_name, field_str, 'degree={:}'.format(polynomial_degree)]))
ref_str = 'ref-' + '-'.join([str(r) for r in ref_list])
degree_str = 'o{:}'.format(polynomial_degree)
imgfile = '_'.join(['convergence', setup_name, field_str, ref_str, degree_str])
imgfile += '.png'
imgdir = create_directory('plots')
imgfile = os.path.join(imgdir, imgfile)
print_output('saving figure {:}'.format(imgfile))
plt.savefig(imgfile, dpi=200, bbox_inches='tight')
if expected_slope is not None:
err_msg = '{:}: Wrong convergence rate {:.4f}, expected {:.4f}'.format(setup_name, slope, expected_slope)
assert slope > expected_slope*(1 - slope_rtol), err_msg
print_output('{:}: convergence rate {:.4f} PASSED'.format(setup_name, slope))
else:
print_output('{:}: {:} convergence rate {:.4f}'.format(setup_name, field_str, slope))
return slope

check_convergence(x_log, y_log, polynomial_degree+1, 'tracer', saveplot)

# ---------------------------
# standard tests for pytest
# ---------------------------


@pytest.fixture(params=[1])
def polynomial_degree(request):
return request.param


@pytest.mark.parametrize(('stepper'),
[('CrankNicolson')])

def test_horizontal_advection(polynomial_degree, stepper, ):
run_convergence([1, 2, 3], polynomial_degree=polynomial_degree,
timestepper_type=stepper)

# ---------------------------
# run individual setup for debugging
# ---------------------------


if __name__ == '__main__':
run_convergence([1, 2, 3, 4], polynomial_degree=1,
timestepper_type='CrankNicolson',
no_exports=False, saveplot=True)
Loading

0 comments on commit 0e9f313

Please sign in to comment.