Source code for gaussfitter.gaussfitter

"""
===========
gaussfitter
===========
.. codeauthor:: Adam Ginsburg <adam.g.ginsburg@gmail.com> 3/17/08

Latest version available at <https://github.com/keflavich/gaussfitter>, where
it was moved from google code on January 30, 2014

"""
from __future__ import print_function, division, absolute_import

import numpy as np
from numpy.ma import median
from numpy import pi
from .mpfit import mpfit
"""
Note about mpfit/leastsq:
I switched everything over to the Markwardt mpfit routine for a few reasons,
but foremost being the ability to set limits on parameters, not just force them
to be fixed.  As far as I can tell, leastsq does not have that capability.

The version of mpfit I use can be found here:
    http://code.google.com/p/agpy/source/browse/trunk/mpfit

Alternative: lmfit

.. todo::
    -turn into a class instead of a collection of objects
    -implement WCS-based gaussian fitting with correct coordinates
"""


[docs]def moments(data, circle, rotate, vheight, estimator=median, angle_guess=45.0, **kwargs): """ Returns (height, amplitude, x, y, width_x, width_y, rotation angle) the gaussian parameters of a 2D distribution by calculating its moments. Depending on the input parameters, will only output a subset of the above. If using masked arrays, pass estimator=np.ma.median """ total = np.abs(data).sum() Y, X = np.indices(data.shape) # python convention: reverse x,y np.indices y = np.argmax((X*np.abs(data)).sum(axis=1)/total) x = np.argmax((Y*np.abs(data)).sum(axis=0)/total) col = data[int(y), :] width_x = np.sqrt(np.abs((np.arange(col.size)-y)**2*col).sum() / np.abs(col).sum()) row = data[:, int(x)] width_y = np.sqrt(np.abs((np.arange(row.size)-x)**2*row).sum() / np.abs(row).sum()) width = (width_x + width_y) / 2. height = estimator(data.ravel()) amplitude = data.max()-height mylist = [amplitude, x, y] if np.isnan((width_y,width_x,height,amplitude)).any(): raise ValueError("something is nan") if vheight: mylist = [height] + mylist if not circle: mylist = mylist + [width_x, width_y] if rotate: # rotation "moment" is a little above zero to initiate the fitter # with something not locked at the edge of parameter space mylist = mylist + [angle_guess] # also, circles don't rotate. else: mylist = mylist + [width] return mylist
[docs]def twodgaussian(inpars, circle=False, rotate=True, vheight=True, shape=None): """ Returns a 2d gaussian function of the form: x' = np.cos(rota) * x - np.sin(rota) * y y' = np.sin(rota) * x + np.cos(rota) * y (rota should be in degrees) g = b + a * np.exp ( - ( ((x-center_x)/width_x)**2 + ((y-center_y)/width_y)**2 ) / 2 ) inpars = [b,a,center_x,center_y,width_x,width_y,rota] (b is background height, a is peak amplitude) where x and y are the input parameters of the returned function, and all other parameters are specified by this function However, the above values are passed by list. The list should be: inpars = (height,amplitude,center_x,center_y,width_x,width_y,rota) You can choose to ignore / neglect some of the above input parameters using the following options: Parameters ---------- circle : bool default is an elliptical gaussian (different x, y widths), but can reduce the input by one parameter if it's a circular gaussian rotate : bool default allows rotation of the gaussian ellipse. Can remove last parameter by setting rotate=0 vheight : bool default allows a variable height-above-zero, i.e. an additive constant for the Gaussian function. Can remove first parameter by setting this to 0 shape : tuple if shape is set (to a 2-parameter list) then returns an image with the gaussian defined by inpars """ inpars_old = inpars inpars = list(inpars) if vheight: height = inpars.pop(0) height = float(height) else: height = float(0) amplitude, center_y, center_x = inpars.pop(0), inpars.pop(0), inpars.pop(0) amplitude = float(amplitude) center_x = float(center_x) center_y = float(center_y) if circle: width = inpars.pop(0) width_x = float(width) width_y = float(width) rotate = 0 else: width_x, width_y = inpars.pop(0), inpars.pop(0) width_x = float(width_x) width_y = float(width_y) if rotate: rota = inpars.pop(0) rota = pi/180. * float(rota) rcen_x = center_x * np.cos(rota) - center_y * np.sin(rota) rcen_y = center_x * np.sin(rota) + center_y * np.cos(rota) else: rcen_x = center_x rcen_y = center_y if len(inpars) > 0: raise ValueError("There are still input parameters:" + str(inpars) + " and you've input: " + str(inpars_old) + " circle=%d, rotate=%d, vheight=%d" % (circle, rotate, vheight)) def rotgauss(x, y): if rotate: xp = x * np.cos(rota) - y * np.sin(rota) yp = x * np.sin(rota) + y * np.cos(rota) else: xp = x yp = y g = height+amplitude*np.exp(-(((rcen_x-xp)/width_x)**2 + ((rcen_y-yp)/width_y)**2)/2.) return g if shape is not None: return rotgauss(*np.indices(shape)) else: return rotgauss
[docs]def gaussfit(data, err=None, params=(), autoderiv=True, return_error=False, circle=False, fixed=np.repeat(False, 7), limitedmin=[False, False, False, False, True, True, True], limitedmax=[False, False, False, False, False, False, True], usemoment=np.array([], dtype='bool'), minpars=np.repeat(0, 7), maxpars=[0, 0, 0, 0, 0, 0, 180], rotate=True, vheight=True, quiet=True, returnmp=False, returnfitimage=False, **kwargs): """ Gaussian fitter with the ability to fit a variety of different forms of 2-dimensional gaussian. Parameters ---------- data : `numpy.ndarray` 2-dimensional data array err : `numpy.ndarray` or None error array with same size as data array. Defaults to 1 everywhere. params : (height, amplitude, x, y, width_x, width_y, rota) Initial input parameters for Gaussian function. If not input, these will be determined from the moments of the system, assuming no rotation autoderiv : bool Use the autoderiv provided in the lmder.f function (the alternative is to us an analytic derivative with lmdif.f: this method is less robust) return_error : bool Default is to return only the Gaussian parameters. If ``True``, return fit params & fit error returnfitimage : bool returns (best fit params,best fit image) returnmp : bool returns the full mpfit struct circle : bool The default is to fit an elliptical gaussian (different x, y widths), but the input is reduced by one parameter if it's a circular gaussian. rotate : bool Allow rotation of the gaussian ellipse. Can remove last parameter of input & fit by setting rotate=False. Angle should be specified in degrees. vheight : bool Allows a variable height-above-zero, i.e. an additive constant background for the Gaussian function. Can remove the first fitter parameter by setting this to ``False`` usemoment : `numpy.ndarray`, dtype='bool' Array to choose which parameters to use a moment estimation for. Other parameters will be taken from params. Returns ------- (params, [parerr], [fitimage]) | (mpfit, [fitimage]) parameters : list The default output is a set of Gaussian parameters with the same shape as the input parameters fitimage : `numpy.ndarray` If returnfitimage==True, the last return will be a 2D array holding the best-fit model mpfit : `mpfit` object If ``returnmp==True`` returns a `mpfit` object. This object contains a `covar` attribute which is the 7x7 covariance array generated by the mpfit class in the `mpfit_custom.py` module. It contains a `param` attribute that contains a list of the best fit parameters in the same order as the optional input parameter `params`. """ data = data.view(np.ma.MaskedArray).view('float') usemoment = np.array(usemoment, dtype='bool') params = np.array(params, dtype='float') if usemoment.any() and len(params) == len(usemoment): moment = np.array(moments(data, circle, rotate, vheight, **kwargs), dtype='float') params[usemoment] = moment[usemoment] elif params == [] or len(params) == 0: params = (moments(data, circle, rotate, vheight, **kwargs)) if not vheight: # If vheight is not set, we set it for sub-function calls but fix the # parameter at zero vheight=True params = np.concatenate([[0],params]) fixed[0] = 1 # mpfit will fail if it is given a start parameter outside the allowed range: for i in range(len(params)): if params[i] > maxpars[i] and limitedmax[i]: params[i] = maxpars[i] if params[i] < minpars[i] and limitedmin[i]: params[i] = minpars[i] # One time: check if error is set, otherwise fix it at 1. err = err if err is not None else 1.0 def mpfitfun(data, err): def f(p, fjac): twodg = twodgaussian(p, circle, rotate, vheight) delta = (data - twodg(*np.indices(data.shape))) / err return [0, delta.compressed()] return f parinfo = [{'n': 1, 'value': params[1], 'limits': [minpars[1], maxpars[1]], 'limited': [limitedmin[1], limitedmax[1]], 'fixed': fixed[1], 'parname': "AMPLITUDE", 'error': 0}, {'n': 2, 'value': params[2], 'limits': [minpars[2], maxpars[2]], 'limited': [limitedmin[2], limitedmax[2]], 'fixed': fixed[2], 'parname': "XSHIFT", 'error': 0}, {'n': 3, 'value': params[3], 'limits': [minpars[3], maxpars[3]], 'limited': [limitedmin[3], limitedmax[3]], 'fixed': fixed[3], 'parname': "YSHIFT", 'error': 0}, {'n': 4, 'value': params[4], 'limits': [minpars[4], maxpars[4]], 'limited': [limitedmin[4], limitedmax[4]], 'fixed': fixed[4], 'parname': "XWIDTH", 'error': 0}] if vheight: parinfo.insert(0, {'n': 0, 'value': params[0], 'limits': [minpars[0], maxpars[0]], 'limited': [limitedmin[0], limitedmax[0]], 'fixed': fixed[0], 'parname': "HEIGHT", 'error': 0}) if not circle: parinfo.append({'n': 5, 'value': params[5], 'limits': [minpars[5], maxpars[5]], 'limited': [limitedmin[5], limitedmax[5]], 'fixed': fixed[5], 'parname': "YWIDTH", 'error': 0}) if rotate: parinfo.append({'n': 6, 'value': params[6], 'limits': [minpars[6], maxpars[6]], 'limited': [limitedmin[6], limitedmax[6]], 'fixed': fixed[6], 'parname': "ROTATION", 'error': 0}) if not autoderiv: # the analytic derivative, while not terribly difficult, is less # efficient and useful. I only bothered putting it here because I was # instructed to do so for a class project - please ask if you would # like this feature implemented raise NotImplementedError("I'm sorry, I haven't implemented this feature yet. " "Given that I wrote this message in 2008, " "it will probably never be implemented.") else: mp = mpfit(mpfitfun(data, err), parinfo=parinfo, quiet=quiet) if mp.errmsg: raise Exception("MPFIT error: {0}".format(mp.errmsg)) if (not circle) and rotate: mp.params[-1] %= 180.0 mp.chi2 = mp.fnorm try: mp.chi2n = mp.fnorm/mp.dof except ZeroDivisionError: mp.chi2n = np.nan if returnmp: returns = (mp) elif return_error: returns = mp.params,mp.perror else: returns = mp.params if returnfitimage: fitimage = twodgaussian(mp.params, circle, rotate, vheight)(*np.indices(data.shape)) returns = (returns, fitimage) return returns
[docs]def onedmoments(Xax, data, vheight=True, estimator=median, negamp=None, veryverbose=False, **kwargs): """Returns (height, amplitude, x, width_x) the gaussian parameters of a 1D distribution by calculating its moments. Depending on the input parameters, will only output a subset of the above. If using masked arrays, pass estimator=np.ma.median 'estimator' is used to measure the background level (height) negamp can be used to force the peak negative (True), positive (False), or it will be "autodetected" (negamp=None) """ dx = np.mean(Xax[1:] - Xax[:-1]) # assume a regular grid integral = (data*dx).sum() height = estimator(data) # try to figure out whether pos or neg based on the minimum width of the pos/neg peaks Lpeakintegral = integral - height*len(Xax)*dx - (data[data > height]*dx).sum() Lamplitude = data.min()-height Lwidth_x = 0.5*(np.abs(Lpeakintegral / Lamplitude)) Hpeakintegral = integral - height*len(Xax)*dx - (data[data < height]*dx).sum() Hamplitude = data.max()-height Hwidth_x = 0.5*(np.abs(Hpeakintegral / Hamplitude)) Lstddev = Xax[data < data.mean()].std() Hstddev = Xax[data > data.mean()].std() # print "Lstddev: %10.3g Hstddev: %10.3g" % (Lstddev,Hstddev) # print "Lwidth_x: %10.3g Hwidth_x: %10.3g" % (Lwidth_x,Hwidth_x) if negamp: # can force the guess to be negative xcen, amplitude, width_x = Xax[np.argmin(data)], Lamplitude, Lwidth_x elif negamp is None: if Hstddev < Lstddev: xcen, amplitude, width_x, = Xax[np.argmax(data)], Hamplitude, Hwidth_x else: xcen, amplitude, width_x, = Xax[np.argmin(data)], Lamplitude, Lwidth_x else: # if negamp==False, make positive xcen, amplitude, width_x = Xax[np.argmax(data)], Hamplitude, Hwidth_x if veryverbose: print("negamp: %s amp,width,cen Lower: %g, %g Upper: %g, %g Center: %g" %\ (negamp, Lamplitude, Lwidth_x, Hamplitude, Hwidth_x, xcen)) mylist = [amplitude, xcen, width_x] if np.isnan(width_x) or np.isnan(height) or np.isnan(amplitude): raise ValueError("something is nan") if vheight: mylist = [height] + mylist return mylist
[docs]def onedgaussian(x, H, A, dx, w): """ Returns a 1-dimensional gaussian of form H+A*np.exp(-(x-dx)**2/(2*w**2)) """ return H+A*np.exp(-(x-dx)**2/(2*w**2))
[docs]def onedgaussfit(xax, data, err=None, params=[0, 1, 0, 1], fixed=[False, False, False, False], limitedmin=[False, False, False, True], limitedmax=[False, False, False, False], minpars=[0, 0, 0, 0], maxpars=[0, 0, 0, 0], quiet=True, shh=True, veryverbose=False, vheight=True, negamp=False, usemoments=False): """ Parameters ---------- xax : np.array x axis data : np.array y axis err : np.array error corresponding to data params : tuple Fit parameters: Height of background, Amplitude, Shift, Width fixed : bool Is parameter fixed? limitedmin/minpars : tuple set lower limits on each parameter (default: width>0) limitedmax/maxpars : tuple set upper limits on each parameter quiet : bool should MPFIT output each iteration? shh : bool output final parameters? usemoments : bool replace default parameters with moments Returns ------- Fit parameters Model Fit errors chi2 """ def mpfitfun(x, y, err): if err is None: def f(p, fjac=None): return [0, (y-onedgaussian(x, *p))] else: def f(p, fjac=None): return [0, (y-onedgaussian(x, *p))/err] return f if xax is None: xax = np.arange(len(data)) if not vheight: height = params[0] fixed[0] = True if usemoments: params = onedmoments(xax, data, vheight=vheight, negamp=negamp, veryverbose=veryverbose) if vheight is False: params = [height]+params if veryverbose: print("OneD moments: h: %g a: %g c: %g w: %g" % tuple(params)) parinfo = [{'n': 0, 'value': params[0], 'limits': [minpars[0], maxpars[0]], 'limited': [limitedmin[0], limitedmax[0]], 'fixed': fixed[0], 'parname': "HEIGHT", 'error': 0}, {'n': 1, 'value': params[1], 'limits': [minpars[1], maxpars[1]], 'limited': [limitedmin[1], limitedmax[1]], 'fixed': fixed[1], 'parname': "AMPLITUDE", 'error': 0}, {'n': 2, 'value': params[2], 'limits': [minpars[2], maxpars[2]], 'limited': [limitedmin[2], limitedmax[2]], 'fixed': fixed[2], 'parname': "SHIFT", 'error': 0}, {'n': 3, 'value': params[3], 'limits': [minpars[3], maxpars[3]], 'limited': [limitedmin[3], limitedmax[3]], 'fixed': fixed[3], 'parname': "WIDTH", 'error': 0}] mp = mpfit(mpfitfun(xax, data, err), parinfo=parinfo, quiet=quiet) mpp = mp.params mpperr = mp.perror chi2 = mp.fnorm if mp.status == 0: raise Exception(mp.errmsg) if (not shh) or veryverbose: print("Fit status: ", mp.status) for i, p in enumerate(mpp): parinfo[i]['value'] = p print(parinfo[i]['parname'], p, " +/- ", mpperr[i]) print("Chi2: ", mp.fnorm, " Reduced Chi2: ", mp.fnorm/len(data), " DOF:", len(data)-len(mpp)) return mpp, onedgaussian(xax, *mpp), mpperr, chi2
[docs]def n_gaussian(pars=None, a=None, dx=None, sigma=None): """ Returns a function that sums over N gaussians, where N is the length of a,dx,sigma *OR* N = len(pars) / 3 The background "height" is assumed to be zero (you must "baseline" your spectrum before fitting) pars - a list with len(pars) = 3n, assuming a,dx,sigma repeated dx - offset (velocity center) values sigma - line widths a - amplitudes """ if len(pars) % 3 == 0: a = [pars[ii] for ii in range(0, len(pars), 3)] dx = [pars[ii] for ii in range(1, len(pars), 3)] sigma = [pars[ii] for ii in range(2, len(pars), 3)] elif not(len(dx) == len(sigma) == len(a)): raise ValueError("Wrong array lengths! dx: %i sigma: %i a: %i" % (len(dx), len(sigma), len(a))) def g(x): v = np.zeros(len(x)) for i in range(len(dx)): v += a[i] * np.exp(-(x-dx[i])**2 / (2.0*sigma[i]**2)) return v return g
[docs]def multigaussfit(xax, data, ngauss=1, err=None, params=[1, 0, 1], fixed=[False, False, False], limitedmin=[False, False, True], limitedmax=[False, False, False], minpars=[0, 0, 0], maxpars=[0, 0, 0], quiet=True, shh=True, veryverbose=False): """ An improvement on onedgaussfit. Lets you fit multiple gaussians. Parameters ---------- xax : np.array x axis data : np.array y axis err : np.array or None error corresponding to data ngauss : int How many gaussians to fit? Default 1 (this could supersede onedgaussfit). Parameters below need to have lenght of 3*ngauss. If ``ngauss``>1 and their lenght is 3, they will be replicated ngaus times, otherwise they will be reset to defaults. params : list Fit parameters: [amplitude, offset, width] * ngauss If len(params) % 3 == 0, ngauss will be set to len(params) / 3 fixed : list of bools Is parameter fixed? limitedmin/minpars : list set lower limits on each parameter (default: width>0) limitedmax/maxpars : list set upper limits on each parameter minpars : list maxpars : list quiet : bool should MPFIT output each iteration? shh : bool output final parameters? veryverbose : bool Returns ------- Fit parameters Model Fit errors chi2 """ if len(params) != ngauss and (len(params) // 3) > ngauss: ngauss = len(params) // 3 if isinstance(params, np.ndarray): params = params.tolist() # make sure all various things are the right length; if they're not, fix them using the defaults for parlist in (params, fixed, limitedmin, limitedmax, minpars, maxpars): if len(parlist) != 3*ngauss: # if you leave the defaults, or enter something that can be multiplied by 3 to get # to the right number of gaussians, it will just replicate if len(parlist) == 3: parlist *= ngauss elif parlist == params: parlist[:] = [1, 0, 1] * ngauss elif parlist == fixed or parlist == limitedmax: parlist[:] = [False, False, False] * ngauss elif parlist == limitedmin: parlist[:] = [False, False, True] * ngauss elif parlist == minpars or parlist == maxpars: parlist[:] = [0, 0, 0] * ngauss def mpfitfun(x, y, err): if err is None: def f(p, fjac=None): return [0, (y-n_gaussian(pars=p)(x))] else: def f(p, fjac=None): return [0, (y-n_gaussian(pars=p)(x))/err] return f if xax is None: xax = np.arange(len(data)) parnames = {0: "AMPLITUDE", 1: "SHIFT", 2: "WIDTH"} parinfo = [{'n': ii, 'value': params[ii], 'limits': [minpars[ii], maxpars[ii]], 'limited': [limitedmin[ii], limitedmax[ii]], 'fixed': fixed[ii], 'parname': parnames[ii % 3]+str(ii % 3), 'error': ii} for ii in range(len(params))] if veryverbose: print("GUESSES: ") print("\n".join(["%s: %s" % (p['parname'], p['value']) for p in parinfo])) mp = mpfit(mpfitfun(xax, data, err), parinfo=parinfo, quiet=quiet) mpp = mp.params mpperr = mp.perror chi2 = mp.fnorm if mp.status == 0: raise Exception(mp.errmsg) if not shh: print("Final fit values: ") for i, p in enumerate(mpp): parinfo[i]['value'] = p print(parinfo[i]['parname'], p, " +/- ", mpperr[i]) print("Chi2: ", mp.fnorm, " Reduced Chi2: ", mp.fnorm/len(data), " DOF:", len(data)-len(mpp)) return mpp, n_gaussian(pars=mpp)(xax), mpperr, chi2
[docs]def collapse_gaussfit(cube, xax=None, axis=2, negamp=False, usemoments=True, nsigcut=1.0, mppsigcut=1.0, return_errors=False, **kwargs): import time std_coll = cube.std(axis=axis) std_coll[std_coll == 0] = np.nan # must eliminate all-zero spectra mean_std = median(std_coll[std_coll == std_coll]) if axis > 0: cube = cube.swapaxes(0, axis) width_arr = np.zeros(cube.shape[1:]) + np.nan amp_arr = np.zeros(cube.shape[1:]) + np.nan chi2_arr = np.zeros(cube.shape[1:]) + np.nan offset_arr = np.zeros(cube.shape[1:]) + np.nan width_err = np.zeros(cube.shape[1:]) + np.nan amp_err = np.zeros(cube.shape[1:]) + np.nan offset_err = np.zeros(cube.shape[1:]) + np.nan if xax is None: xax = np.arange(cube.shape[0]) starttime = time.time() print("Cube shape: ", cube.shape) extremum = np.min if negamp else np.max print("Fitting a total of %i spectra with peak signal above %f" %\ ((np.abs(extremum(cube, axis=0)) > (mean_std*nsigcut)).sum(), mean_std*nsigcut)) for i in range(cube.shape[1]): t0 = time.time() nspec = (np.abs(extremum(cube[:,i,:], axis=0)) > (mean_std*nsigcut)).sum() print("Working on row %d with %d spectra to fit" % (i, nspec), end=' ') for j in range(cube.shape[2]): if np.abs(extremum(cube[:,i,j])) > (mean_std*nsigcut): mpp, gfit, mpperr, chi2 = onedgaussfit(xax, cube[:,i,j], err=np.ones(cube.shape[0])*mean_std, negamp=negamp, usemoments=usemoments, **kwargs) if np.abs(mpp[1]) > (mpperr[1]*mppsigcut): width_arr[i,j] = mpp[3] offset_arr[i,j] = mpp[2] chi2_arr[i,j] = chi2 amp_arr[i,j] = mpp[1] width_err[i,j] = mpperr[3] offset_err[i,j] = mpperr[2] amp_err[i,j] = mpperr[1] dt = time.time()-t0 if nspec > 0: print("in %f seconds (average: %f)" % (dt, dt/float(nspec))) else: print("in %f seconds" % (dt)) print("Total time %f seconds" % (time.time()-starttime)) if return_errors: return width_arr, offset_arr, amp_arr, width_err, offset_err, amp_err, chi2_arr else: return width_arr, offset_arr, amp_arr, chi2_arr