Source code for sf_deconvolve

#! /usr/bin/env python
# -*- coding: utf-8 -*-

"""SF_DECONVOLVE

This module is executable and contains methods for deconvolving a set of
observed galaxy images.

:Author: Samuel Farrens <samuel.farrens@gmail.com>

:Version: 4.3

:Date: 23/10/2017

Notes
-----
This code implements equations 8 and 17 from [F2017]_.

"""

from __future__ import print_function
import numpy as np
from os.path import splitext
from lib.args import get_opts
from lib.file_io import *
from lib.deconvolve import run
from lib.tests import test_deconvolution, test_psf_estimation
from sf_tools.interface.errors import catch_error, warn
from sf_tools.interface.log import set_up_log, close_log


[docs]def set_out_string(): """Set output string This method checks if an output string has been specified and if not creates and ouput string from the input string and the mode """ if isinstance(opts.output, type(None)): opts.output = splitext(opts.input)[0] + '_' + opts.mode
[docs]def check_psf(data, log): """Check PSF This method checks that the input PSFs are properly normalised and updates the opts namespace with the PSF type Parameters ---------- data : np.ndarray Input data array log : logging.Logger Log instance """ psf_sum = np.sum(data, axis=tuple(range(data.ndim - 2, data.ndim))) if not np.all(np.abs(psf_sum - 1) < 1e-5): warn('Not all PSFs integrate to 1.0.') log.info(' - Not all PSFs integrate to 1.0.') if data.ndim == 2: opts.psf_type = 'fixed'
[docs]def run_script(log): """Run script This method runs the script. Parameters ---------- log : logging.Logger Log instance """ h_line = ' ' + '-' * 70 print(h_line) # Begin log output_text = ' Running SF_DECONVOLVE' print(output_text) log.info(output_text) opts.log = log print(h_line) ########################################################################### # Read the input data files data_noisy, psf_data, primal = read_input_files(opts.input, opts.psf_file, opts.current_res) check_psf(psf_data, log) opts.primal = primal ########################################################################### # Log input options print(' - Input File:', opts.input) print(' - PSF File:', opts.psf_file) log.info(' - Input File: ' + opts.input) log.info(' - PSF File: ' + opts.psf_file) if not isinstance(opts.current_res, type(None)): print(' - Current Results:', opts.current_res) log.info(' - Current Results: ' + opts.current_res) # Log set-up options print(' - Mode:', opts.mode) print(' - PSF Type:', opts.psf_type) print(' - Gradient Type: ', opts.grad_type) print(' - Optimisation:', opts.opt_type) print(' - Positivity:', not opts.no_pos) print(' - Number of Reweightings:', opts.n_reweights) print(' - Number of Iterations:', opts.n_iter) print(' - Cost Function Window:', opts.cost_window) print(' - Convergence Tolerance:', opts.convergence) print(' - Suppress Plots:', opts.no_plots) log.info(' - Mode: ' + opts.mode) log.info(' - PSF Type: ' + opts.psf_type) log.info(' - Gradient Type: ' + opts.grad_type) log.info(' - Optimisation: ' + opts.opt_type) log.info(' - Positivity: ' + str(not opts.no_pos)) log.info(' - Number of Reweightings: ' + str(opts.n_reweights)) log.info(' - Number of Iterations: ' + str(opts.n_iter)) log.info(' - Cost Function Window: ' + str(opts.cost_window)) log.info(' - Convergence Tolerance: ' + str(opts.convergence)) log.info(' - Suppress Plots:' + str(opts.no_plots)) # Log sparsity options if opts.mode in ('all', 'sparse'): print(' - Wavelet Type:', opts.wavelet_type) print(' - Wavelet Threshold Factor:', opts.wave_thresh_factor) log.info(' - Wavelet Type: ' + str(opts.wavelet_type)) log.info(' - Wavelet Threshold Factor: ' + str(opts.wave_thresh_factor)) # Log low-rank options if opts.mode in ('all', 'lowr'): print(' - Low Rank Threshold Factor:', opts.lowr_thresh_factor) print(' - Low Rank Threshold Type:', opts.lowr_thresh_type) print(' - Low Rank Type:', opts.lowr_type) log.info(' - Low Rank Threshold Factor: ' + str(opts.lowr_thresh_factor)) log.info(' - Low Rank Threshold Type: ' + str(opts.lowr_thresh_type)) log.info(' - Low Rank Type: ' + str(opts.lowr_type)) # Log PSF estimation options if opts.grad_type == 'psf_unknown': print(' - PSF Estimation Control Parameter (Lambda):', opts.lambda_psf) print(' - PSF Estimation Gradient Step Size (Beta):', opts.beta_psf) log.info(' - PSF Estimation Lambda: ' + str(opts.lambda_psf)) log.info(' - PSF Estimation Beta: ' + str(opts.beta_psf)) # Log PSF estimation options if opts.grad_type == 'shape': print(' - Shape Constraint Control Parameter (Lambda):', opts.lambda_shape) log.info(' - Shape Constraint Lambda: ' + str(opts.lambda_shape)) print(h_line) ########################################################################### # Perform deconvolution. results = run(data_noisy, psf_data, **vars(opts)) ########################################################################### if not isinstance(opts.clean_data, type(None)): print(h_line) ####################################################################### # Test the deconvolution image_errors = test_deconvolution(results[0], opts.clean_data, opts.random_seed, opts.kernel, opts.metric) ####################################################################### # Log testing options print(' - Clean Data:', opts.clean_data) print(' - Random Seed:', opts.random_seed) log.info(' - Clean Data: ' + opts.clean_data) log.info(' - Random Seed: ' + str(opts.random_seed)) # Log kernel options if not isinstance(opts.kernel, type(None)): print(' - Gaussian Kernel:', opts.kernel) log.info(' - Gaussian Kernel: ' + str(opts.kernel)) # Log test results print(' - Galaxy Pixel Error:', image_errors[0]) print(' - Galaxy Shape Error:', image_errors[1]) print(' - Galaxy PSNR:', image_errors[2]) log.info(' - Galaxy Pixel Error: ' + str(image_errors[0])) log.info(' - Galaxy Shape Error: ' + str(image_errors[1])) log.info(' - Galaxy PSNR: ' + str(image_errors[2])) if (not isinstance(opts.true_psf, type(None)) and opts.grad_type == 'psf_unknown'): ################################################################### # Test the psf estimation psf_errors = test_psf_estimation(results[2], opts.true_psf, opts.kernel, opts.metric) ################################################################### print('') # Log testing options print(' - True PSFs:', opts.true_psf) log.info(' - True PSFs: ' + opts.true_psf) # Log test results print(' - PSF Pixel Error:', psf_errors[0]) print(' - PSF Shape Error:', psf_errors[1]) print(' - PSF PSNR:', psf_errors[2]) log.info(' - PSF Pixel Error: ' + str(psf_errors[0])) log.info(' - PSF Shape Error: ' + str(psf_errors[1])) log.info(' - PSF PSNR: ' + str(psf_errors[2])) print(h_line) ########################################################################### # Save outputs to numpy binary files write_output_files(opts.output, results[0], dual_res=results[1], psf_res=results[2], output_format=opts.output_format) ########################################################################### # Log output options print(' Output 1 saved to: ' + opts.output + '_primal' + '.' + opts.output_format) log.info('Output 1 saved to: ' + opts.output + '_primal' + '.' + opts.output_format) if opts.opt_type == 'condat': print(' Output 2 saved to: ' + opts.output + '_dual' + '.' + opts.output_format) log.info('Output 2 saved to: ' + opts.output + '_dual' + '.' + opts.output_format) if opts.grad_type == 'psf_unknown': print(' Output 3 saved to: ' + opts.output + '_psf' + '.' + opts.output_format) log.info('Output 3 saved to: ' + opts.output + '_psf' + '.' + opts.output_format) # Close log log.info('Script successfully completed!') log.info('') close_log(log) print(h_line)
[docs]def main(args=None): try: global opts opts = get_opts(args) set_out_string() log = set_up_log(opts.output) run_script(log) except Exception as err: catch_error(err, log) return 1
if __name__ == "__main__": main()