Source code for contur.factories.test_observable

import contur
import rivet
import yoda
import numpy as np
import traceback
import sys
import scipy.stats as spstat

import contur.config.config as cfg
import contur.data.static_db as cdb
import contur.util.utils as cutil
import contur.factories.likelihood as lh

[docs] class Observable(object): """ Processes and decorates :class:`YODA.AnalysisObject` to a testable format :param ana_obj: ``YODA`` AO to dress, containing signal info. :type ana_obj: :class:`YODA.AnalysisObject` :param xsec: _XSEC scatter recording generator cross section in YODA file (*contained in all Rivet run outputs*) :type xsec: :class:`YODA.Scatter1D` :param nev: _EVTCOUNT scatter recording total generated events in YODA file (*contained in all Rivet run outputs*) :type nev: :class:`YODA.Scatter1D` :param sm: Standard Model prediction for this observable :type sm: :class:`SMPrediction` """ def __init__(self, ana_obj, xsec, nev, sm=None): from contur.factories.yoda_factories import root_n_errors self.signal = ana_obj self.xsec = xsec self.nev = nev # Measurement self._ref = None # SM theory self._thy = None self.pool = None # check we are using the right HepMC weight (if supplied) and remove it from the path self._weight = rivet.extractWeightName(self.signal.path()) if self._weight != cfg.weight: return self.signal.setPath(rivet.stripWeightName(self.signal.path())) # Initialize the public members we always want to access self._isRatio = cdb.isRatio(self.signal.path()) self._isProfile = False self._isSearch = cdb.isSearch(self.signal.path()) self._stack_smbg = yoda.Scatter2D self._stack_databg = yoda.Scatter2D # these should just be used for plotting, which is not done if writer is silenced (e.g. in gridMode) self._refplot = None self._sigplot = None self._thyplot = None self._lumi = 1 self._lumi_fb = 1 self._isScaled = False self._scaleFactorData = 1 self._scaleFactorSig = 1 self._conturPoints = [] self._nev_differential = 1.0 self._likelihood = None self.measured_values = None self.bsm_values = None self.sm_values = None self.expected_values = None self.sm_prediction = sm # Call the internal functions on initialization # to fill the above members with what we want, these should all be private self.__getAux() if self.pool is None: return # Get the measurement reference data. if not self.__getData(): return # Get the theory reference data, if present, and set the appropriate # convenience flag if successful. self.__getThy() self._useTheory = self.sm_values is not None self.__getisScaled() # build the expected values (SM theory with data uncertainties) self.__getExpected() #cfg.contur_log.debug("Measured: VALS: {} \n COV: {} \n DIAG {} \n ERRS {}".format( # self.measured_values.central_values,self.measured_values.covariance_matrix, # self.measured_values.diagonal_matrix,self.measured_values.err_breakdown)) # Determine the type of object we have, and build a 2D scatter from it if it is not one already # Also recalculate scalefactor, if appropriate if self.signal.type() in ['Histo1D', 'Profile1D', 'Counter','Scatter2D']: self._isProfile = self.signal.type().startswith("Profile") if self._isScaled and xsec is not None: # if the plot is area normalised (ie scaled), work out the factor from number of events and generator xs # (this is just the integrated cross section associated with the plot) # TODO: should maybe be using effNumEntries to correctly handle weighted events? try: self._scaleFactorSig = self.xsec.val()*self.signal.annotation("ScaledBy", 1.0)/float(self.nev.val()) except AttributeError: try: # yoda 1 case self._scaleFactorSig = self.xsec.point(0).x()*self.signal.annotation("ScaledBy", 1.0)/float(self.nev.numEntries()) except Exception as e: cfg.contur_log.debug("missing info for scalefactor calc", exc_info=e) self._scaleFactorSig=1.0 self.signal = yoda.plotting.utils.mkPlotFriendlyScatter(self.signal,includeOverflows=False,includeMaskedBins=False) # Make sure it is actually a Scatter2D - mkScatter makes Scatter1D from counter. self.signal = yoda.plotting.utils.mkPlotFriendlyScatter(self.signal,includeOverflows=False,includeMaskedBins=False) if not cfg.silenceWriter: # Public member function to build plots needed for direct histogram visualisation # avoid calling YODA.clone() unless we have to # Must be called before scaling. if self._ref: self.doPlot() else: cfg.contur_log.warning("No reference data found for histo: {}".format(self.signal.path())) # if everything we need is available, there will be ref data. if self._ref and self.xsec: if self._isScaled: self.__doScale() # include the stat uncertainties on the expected number of events. root_n_errors(self.signal,self._isSearch,nx=self._nev_differential,lumi=self._lumi) self.__fillBucket() # print("BSM: VALS: {} \n COV: {} \n DIAG {} \n ERRS {}".format( # self.bsm_values.central_values,self.bsm_values.covariance_matrix, # self.bsm_values.diagonal_matrix,self.bsm_values.err_breakdown)) def __getisScaled(self): """Internal function to look up Scaling attributes from the contur database, defined in :mod:`contur.data` :Built members: * *isScaled* (``bool``) -- True if some scaling has been applied to this histogram * *scaleFactorData* (``float``) -- Factor to scale the ref data by (n count) to undo the normalisation """ self._isScaled, self._scaleFactorData, self._nev_differential = cdb.isNorm(self.signal.path()) def __getData(self): """ Internal function to look up the refdata :Built members: * *ref* (:class:YODA.Scatter2D) -- Reference scatter plot matching path from input signal aos * *measured_values* (:class:ObservableValues) -- central values, covariance etc for the measuremwent """ try: self._ref = cfg.refObj["/REF" + rivet.stripOptions(self.signal.path())] # extract the bin widths self._bin_widths = [] for xerr in self._ref.xErrs(): self._bin_widths.append(xerr[0]) except KeyError: return False try: cov = cfg.refCorr[self._ref.path()].copy() nuisErrs = cfg.refErrors[self._ref.path()].copy() cfg.contur_log.debug("Attempting to use correlation info for {}".format(self.signal.path())) except Exception as e: cfg.contur_log.debug("No correlation info for {}".format(self.signal.path())) cov = None nuisErrs = None try: uncov = cfg.refUncorr[self._ref.path()].copy() except: uncov = None return False self.measured_values = ObservableValues(bin_widths=self._bin_widths, central_values=self._ref.yVals(), err_breakdown=nuisErrs, covariance_matrix=cov, diagonal_matrix=uncov ) return True def __getExpected(self): """ Internal function to get the expected values (SM cental values with data uncertainty) :Modified members: * *expected_values* (:class:YODA.Scatter2D) -- Reference scatter plot matching path from input signal aos """ # can't do this if we don't have a theory prediction! if self.sm_values is None: return self._expected = self._ref.clone() for i in range(0, len(self._expected.points())): self._expected.points()[i].setY(self._thy.points()[i].y()) if self.measured_values.covariance_matrix is not None: cov = self.measured_values.covariance_matrix.copy() errs = self.measured_values.err_breakdown.copy() else: cov = None errs = None self.expected_values = ObservableValues(bin_widths=self._bin_widths, central_values=self.sm_values.central_values.copy(), err_breakdown=errs, covariance_matrix=cov, diagonal_matrix=self.measured_values.diagonal_matrix.copy() ) def __getThy(self): """ Internal function to look up the SM theory data :Built members: * *thy* (:class:YODA.Scatter2D) -- SM Theory prediction matching path from input signal aos * *sm_values* (:class:ObservableValues) -- correlation matrix, central values etc for the SM theory prediction """ # find whether theory is always required for this histogram self._theoryComp = cdb.theoryComp(self.signal.path()) # if the SM prediction was not already set, try to populate it. try: if self.sm_prediction is None: self.sm_prediction = cfg.sm_prediction[self.analysis.name] path = "/THY"+self.signal.path() self._thy = self.sm_prediction.ao[path].clone() except KeyError: # no SM prediction for this plot. return try: thCov = self.sm_prediction.corr[self._thy.path()].copy() thErrs = self.sm_prediction.errors[self._thy.path()].copy() except: thCov = None thErrs = None try: thUncov = self.sm_prediction.uncorr[self._thy.path()].copy() except KeyError: thUncov = None # just warn if we can't build theory, it's less important... cfg.contur_log.warning( "Could not build any theory error source for %s" % self.signal.path()) cfg.contur_log.debug( "Using theory for {}".format(self._thy.path())) self.sm_values = ObservableValues(bin_widths=self._bin_widths, central_values=self._thy.yVals(), err_breakdown=thErrs, covariance_matrix=thCov, diagonal_matrix=thUncov )
[docs] def doPlot(self): """ Public member function to build yoda plot members for interactive runs These are only for displaty, they are not used in any of the statistics calculations. """ # see if there are unscaled versions of the histos try: self._refplot = cfg.plotObj[self._ref.path()] except KeyError: # otherwise the standard ref should be unscaled self._refplot = self._ref.clone() # and the same thought process for the background model, and for the theory (even if the # theory is not being used as background). if self.sm_values is not None: try: self._thyplot = self.sm_prediction.plotObj[self._thy.path()] except KeyError: self._thyplot = self.sm_prediction.ao[self._thy.path()] # build stack for plotting, for histogrammed data if not self._isRatio and not cfg.sig_plus_bg: self.__buildStack() else: self._stack_databg = self.signal.clone() self._stack_smbg = self.signal.clone() self._sigplot = self.signal.clone()
def __getAux(self): """Internal function to look up auxiliary attributes from the contur database :Built members: * *pool* (``string``) -- String for analysis pool looked up from contur database * *subpool* (``string``) -- String for analysis subpool looked up from contur database """ ana_name, self.histo_name = cutil.splitPath(self.signal.path()) self.analysis, self._lumi, self._lumi_fb, self.pool, self.subpool = cdb.obsFinder(self.signal.path()) def __buildStack(self): """ Private function to stack the signal on backgrounds for easier visualisation """ if self.signal.type() != "Scatter2D": return False bgplot = self._refplot.clone() self._stack_databg = self.signal.clone() if self._stack_databg.numPoints() != bgplot.numPoints(): cfg.contur_log.warning( "%s : stack and background have unequal numbers of points. Skipping." % bgplot.path()) return False for i in range(0, len(self._stack_databg.points())): self._stack_databg.points()[i].setY( self._stack_databg.points()[i].y() * self._scaleFactorSig / self._scaleFactorData + bgplot.points()[i].y()) # set these to include only the BSM errors, since that is what is used in the test self._stack_databg.points()[i].setYErrs(self.signal.points()[i].yErrs()[0] * self._scaleFactorSig / self._scaleFactorData, self.signal.points()[i].yErrs()[1] * self._scaleFactorSig / self._scaleFactorData) if not self._useTheory: return try: # this only exists for scaled plots. bgplot = self.sm_prediction.plotObj[self._thy.path()] except KeyError: bgplot = self.sm_prediction.ao[self._thy.path()] self._stack_smbg = self.signal.clone() if self._stack_smbg.numPoints() != bgplot.numPoints(): cfg.contur_log.warning( "%s : stack and background have unequal numbers of points. Skipping." % bgplot.path()) return False for i in range(0, len(self._stack_smbg.points())): self._stack_smbg.points()[i].setY( self._stack_smbg.points()[i].y() * self._scaleFactorSig / self._scaleFactorData + bgplot.points()[i].y()) eu2 = (self.signal.points()[i].yErrs()[0]*self._scaleFactorSig/self._scaleFactorData)**2 + bgplot.points()[i].yErrs()[0]**2 ed2 = (self.signal.points()[i].yErrs()[1]*self._scaleFactorSig/self._scaleFactorData)**2 + bgplot.points()[i].yErrs()[1]**2 self._stack_smbg.points()[i].setYErrs(np.sqrt(eu2),np.sqrt(ed2)) def __doScale(self): """Private function to perform the normalisation of the signal """ if self.signal.type() != "Scatter2D": return cfg.contur_log.debug("Scaling {} by {}".format(self.signal.path(),self._scaleFactorSig)) self.signal.scale(self.signal.dim()-1, self._scaleFactorSig) def __fillBucket(self): """Create a block, contains the observables from this histogram and their correlation plus statistical metrics :Built members: * *block* (:class:`contur.block`) -- Automatically filled bucket containing statistical test pertaining to this histogram """ if len(self._ref.points()) != len(self.signal.points()): cfg.contur_log.critical( "Ref data and signal for {} have unequal numbers of points ({} vs {})".format( self.signal.path(), len(self._ref.points()), len(self.signal.points())) ) raise Exception # estimate the stat error on the expected signal. # TODO: signal errors are symmetrised here. Does that make any difference? (should not) yErrs = self.signal.yErrs() serrs = [] for epair in yErrs: # these are the MC stat errors serrs.append((abs(epair[0]) + abs(epair[1]))*0.5) serrs = np.array(serrs) self.bsm_values = ObservableValues(bin_widths=self._bin_widths, central_values=self.signal.yVals(), err_breakdown=serrs) try: # yoda 2 case sxsec = self.xsec.val() except AttributeError: # yoda 1 case sxsec = self.xsec.point(0).x() try: self._likelihood = lh.Likelihood(calculate=True, ratio=self._isRatio, lumi=self._lumi, lumi_fb=self._lumi_fb, profile=self._isProfile, sxsec=sxsec, bxsec=self._scaleFactorData, #< TODO: a hack for profiles etc. Improve? tags=self.signal.path(), measured_values=self.measured_values, sm_values=self.sm_values, bsm_values=self.bsm_values, expected_values=self.expected_values) except AttributeError as ate: cfg.contur_log.fatal("This can happen when your yodafile is corrupted: {}".format(ate)) traceback.print_exc() raise self._likelihood.pools = self.pool self._likelihood.subpools = self.subpool
[docs] def get_sm_pval(self): """ Calculate the pvalue compatibility (using chi2 survival) for the SM prediction and this measurement """ return self.likelihood.get_sm_pval()
@property def ref(self): """ Reference data, observed numbers input to test, scaled if required **type** (:class:`YODA.Scatter2D`) """ return self._ref @property def thy(self): """ Reference SM theory data, scaled if required **type** (:class:`YODA.Scatter2D`) """ return self._thy @property def stack_smbg(self): """Stacked, unscaled Signal+background for plotting (SM as background) **type** (:class:`YODA.Scatter2D`) """ return self._stack_smbg @property def stack_databg(self): """Stacked, unscaled Signal+background for plotting (data as background) **type** (:class:`YODA.Scatter2D`) """ return self._stack_databg @property def sigplot(self): """Signal for plotting **type** (:class:`YODA.Scatter2D`) """ return self._sigplot @sigplot.deleter def sigplot(self): del self._sigplot @property def refplot(self): """Reference data for plotting **type** (:class:`YODA.Scatter2D`) """ return self._refplot @property def thyplot(self): """Theory for plotting **type** (:class:`YODA.Scatter2D`) """ return self._thyplot @property def scaled(self): """Bool representing if there is additional scaling applied on top of luminosity **type** (``bool``) """ return self._isScaled @property def has_theory(self): """Bool representing if a theory prediction was found for the input signal **type** (``bool``) """ return (self.sm_values is not None) @property def signal_scale(self): """Scale factor applied to the signal histogram/scatter, derived generally from input nEv and xs **type** (``float``) """ return self._scaleFactorSig @property def data_scale(self): """Scale factor applied to the refdata histogram/scatter **type** (``float``) """ return self._scaleFactorData @property def likelihood(self): """The instance of :class:`~contur.factories.likelihood.Likelihood` derived from this histogram **type** (:class:`~contur.factories.likelihood.Likelihood`) """ return self._likelihood @likelihood.setter def likelihood(self,lh): self._likelihood = lh def __repr__(self): if not self.signal.path(): tag = "Unidentified Source" else: tag = self.signal.path() return "%s from %s, with %s" % (self.__class__.__name__, tag, self._likelihood)
[docs] class ObservableValues(object): """ A book-keeping class to contain all the numerical info (central values, err_breakdown, covariance) for a given binned observable. """ def __init__(self, bin_widths=None, central_values=None, err_breakdown=None, covariance_matrix=None, diagonal_matrix=None, isref=False): self.bin_widths = bin_widths self.central_values= central_values self.err_breakdown = err_breakdown self.covariance_matrix = covariance_matrix self.diagonal_matrix = diagonal_matrix if err_breakdown is not None and diagonal_matrix is None: if isref: print("USING ERR BREAKDOWN") self.diagonal_matrix = np.diag(err_breakdown * err_breakdown)