Source code for contur.util.utils

# -*- python -*-

"""
Miscellaneous helper functions that may be used by more than one contur submodule

"""

import os, glob, subprocess, sys
from builtins import input

import contur
import contur.config.config as cfg
import contur.config.version
import contur.factories.likelihood as lh
import contur.data as cdb

from rivet import mkStdPlotParser

import importlib

## Import the tqdm progress-bar if possible, otherwise fall back to a safe do-nothing option
[docs] def progress_bar(iterable, **kwargs): if cfg.multi_p: try: from tqdm import tqdm return tqdm(iterable,**kwargs) except ImportError: return iterable else: return iterable
[docs] def splitPath(path): """ Take a yoda histogram path and return the analysis name and the histogram name :arg path: the full path of a yoda analysis object :type: String """ from rivet.aopaths import AOPath aop = AOPath(path) parts = AOPath.dirnameparts(aop) analysis = parts[len(parts)-1] h_name = AOPath.basename(aop) return analysis, h_name
[docs] def mkoutdir(outdir): """ Function to make an output directory if it does not already exist. Also tests if an existing directory is write-accessible. """ if not os.path.exists(outdir): try: os.makedirs(outdir) except: msg = "Can't make output directory '%s'" % outdir raise Exception(msg) if not os.access(outdir, os.W_OK): msg = "Can't write to output directory '%s'" % outdir raise Exception(msg)
[docs] def write_banner(): """ Write a text banner giving the version and pointing to the documentation """ msgs = ["Contur version {}".format(contur.config.version.version), "See https://hepcedar.gitlab.io/contur-webpage/"] for m in msgs: try: cfg.contur_log.info(m) except: print(m)
[docs] def insert_line_break(string, length=25): """ insert a LaTeX newline after <length> characters, so that labels stay on the plot canvas """ if len(string) <= length: # If the string is 20 characters or less, there is no need to break it up. return string # Initialize the list of fragments. fragments = [] # Split the string into words using whitespace as the separator. words = string.split() # Initialize the current fragment with the first word. current_fragment = words[0] # Iterate over the remaining words. for word in words[1:]: # If adding the next word to the current fragment would make the fragment longer than 20 characters, # add the current fragment to the list of fragments and start a new fragment with the current word. if len(current_fragment) + len(word) + 1 > length: fragments.append(current_fragment) current_fragment = word else: # Otherwise, add the next word to the current fragment with a space separator. current_fragment += ' ' + word # Add the final fragment to the list of fragments. fragments.append(current_fragment) # Join the fragments with line breaks. result = r'\newline '.join(fragments) return result
[docs] def walklevel(some_dir, level=1): """ Like os.walk but can specify a level to walk to useful for managing directories a bit better https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below """ some_dir = some_dir.rstrip(os.path.sep) try: assert os.path.isdir(some_dir) except AssertionError: cfg.contur_log.critical("{} is not a directory".format(some_dir)) sys.exit(1) num_sep = some_dir.count(os.path.sep) for root, dirs, files in os.walk(some_dir): yield root, dirs, files num_sep_this = root.count(os.path.sep) if num_sep + level <= num_sep_this: del dirs[:]
[docs] def newlogspace(start, stop, num=50, endpoint=True, base=10.0, dtype=None): """ Numpy logspace returns base^start to base^stop, we modify this here so it returns logspaced between start and stop """ import numpy return numpy.logspace(numpy.log(start)/numpy.log(base), numpy.log(stop)/numpy.log(base), num, endpoint, base, dtype)
[docs] def get_inspire(inspire_id): """ Function to query InspireHEP database and return a dictionary containing the metadata extracted/adapted from rivet-findid if contur.config.config.offline is True, no web query is made and returned variables are set to "Offline" if contur.config.config.offline is False, but web query fails, returned variables are set to "Failed" :arg inspire_id: the ID of the publication on Inspire :type inspire_id: ``string`` :return: pub_data ``dictionary`` -- selected metadata as a dictionary of strings """ if not cfg.offline: # Get the necessary packages. try: from urllib.request import urlopen import json except ImportError: from urllib2 import urlopen import json except Exception as e: cfg.ConturError("Error importing URL modules: {}".format(e)) cfg.ConturError("Switching to offline mode") cfg.offline=True pub_data = {} if cfg.offline: pub_data['bibkey']="Offline" pub_data['bibtex']="Offline" else: url = "https://inspirehep.net/api/literature/{}".format(inspire_id) ## Get and test JSON try: cfg.contur_log.debug("Querying inspire: {}".format(inspire_id)) response = urlopen(url) cfg.contur_log.debug("Success") except Exception as e: pub_data['bibkey']="Failed" pub_data['bibtex']="Failed" cfg.contur_log.error("Error opening URL {}: {}".format(url,e)) return pub_data metadata = json.loads(response.read().decode("utf-8")) if metadata.get("status", "") == 404: raise cfg.ConturError('ERROR: id {} not found in the InspireHEP database\n'.format(inspire_id)) try: md=metadata["metadata"] except KeyError as ke: cfg.contur_log.error("Could not find metadata for inspire ID {}".format(inspire_id)) pub_data['bibkey']="Failed" pub_data['bibtex']="Failed" return pub_data pub_data['bibkey']=str(md["texkeys"][0]) biburl = metadata["links"]["bibtex"] cfg.contur_log.debug("Querying inspire: {}".format(biburl)) try: pub_data['bibtex']=urlopen(biburl).read().decode() except Exception as e: cfg.contur_log.error("Failed to read bibtex from {} for inspire ID {}".format(biburl,inspire_id)) pub_data['bibtex']="Failed" return pub_data cfg.contur_log.debug("Success") return pub_data
[docs] def permission_to_continue(message): """Get permission to continue program""" permission = "" while permission.lower() not in ['no', 'yes', 'n', 'y']: permission = str(input("{}\n [y/N]: ".format(message))) if len(permission)==0: permission = 'N' if permission.lower() in ['y', 'yes']: return True else: return False
[docs] class Plot(dict): ''' A tiny Plot object to help writing out the head in the .dat file ''' def __repr__(self): return "# BEGIN PLOT\n" + "\n".join("%s=%s" % (k, v) for k, v in self.items()) + "\n# END PLOT\n\n"
[docs] def cleanupCommas(text): ''' Replace commas and multiple spaces in text by single spaces ''' text=text.replace(","," ") while " " in text: text=text.replace(" ", " ") return text.strip()
[docs] def remove_brackets(text): ''' remove any brackets from text, and try to turn it into a float. return None if not possible. ''' res = text.split("(")[0]+text.split(")")[-1] try: res = float(res) except: res = None return res
[docs] def compress_particle_info(name,info): ''' Turns a dictionary of properties of the particle <name> into simple string-formatted dictionary suitable for storing as parameters: (removes commas, backslashes and spaces and puts <particlename>_<property>) ''' info_dict = {} info_dict["{}_mass".format(name)]=info["mass"] info_dict["{}_width".format(name)]=info["width"] for decay, bf in info.items(): if not (decay=="mass" or decay=="width"): decay = decay.replace(",","") decay = decay.replace(" ","") decay = decay.replace("\\","") info_dict["{}_{}".format(name,decay)]=bf return info_dict
[docs] def compress_xsec_info(info,matrix_elements): ''' compresses a dict of subprocess cross sections into a format they can be stored as AUX params (removes commas, backslashes and spaces and puts AUX:<name>_<property>) Also, if matrix_elements is not None, then remove any which are not in it. ''' info_dict = {} for name, value in info.items(): name = name.replace(",","") name = name.replace(" ","") name = name.replace("\\rightarrow","_") name = name.replace("\\","") if matrix_elements is None or name in matrix_elements: name = "AUX:{}".format(name) info_dict[name]=value return info_dict
[docs] def hack_journal(bibtex): ''' Add a dummy journal field if absent, to stop sphinx barfing. ''' if "journal" in bibtex: return bibtex else: newbibtex = bibtex[:-3]+',journal = "no journal"\n}\n' return newbibtex
[docs] def find_ref_file(analysis): ''' return the REF data file name and path for analysis with name a_name if not found, return an empty string. ''' import rivet yoda_name = analysis.shortname+".yoda.gz" f = rivet.findAnalysisRefFile(yoda_name) return f
[docs] def find_thy_predictions(analysis,prediction_id=None): ''' return the THY data file name and path for analysis with name a_name and chosen ID. if not found, return an empty string. ''' import rivet predictions = cdb.static_db.get_sm_theory(analysis.name) if prediction_id is None: return predictions if predictions is not None: for sm in predictions: if sm.id == prediction_id: return sm return None, None
[docs] def get_beam_dirs(beams): """ return a dict of the paths (under cfg.grid) containing the name of each beam, keyed on beam beams = a list of beams to look for. """ scan_dirs = {} for root, dirnames, files in sorted(walklevel(cfg.grid,1)): for beam in beams: for dir in dirnames: if beam.id in dir: dir_path = os.path.join(cfg.grid,dir) try: if not dir_path in scan_dirs: scan_dirs[beam.id].append(dir_path) else: cfg.contur_log.warning("Directory name {} is ambiguous as to what beam is belongs to.".format(dir_path)) except KeyError: scan_dirs[beam] = [os.path.join(cfg.grid,dir)] return scan_dirs
[docs] def analysis_select(name, veto_only=False): """ return true if the analysis passes the select/veto conditions, false otherwise """ import re for pattern in cfg.vetoAnalyses: if re.compile(pattern).search(name): return False if veto_only: return True if len(cfg.onlyAnalyses)>0: for pattern in cfg.onlyAnalyses: if re.compile(pattern).search(name): return True return False return True
[docs] def executeScript(script, plot_dir = "", multiprocess=False, shared_modules_list=[], print_script=True): """ execute a single Python script, with argument string arg """ if not os.path.isfile(script): raise FileNotFoundError("Python script {} not found!".format(script)) if print_script: try: cfg.contur_log.info("Executing {}".format(script)) except: print("Executing {}".format(script)) try: if multiprocess: # load shared imports mpl = importlib.import_module(shared_modules_list[0]) np = importlib.import_module(shared_modules_list[1]) sys_lib = importlib.import_module(shared_modules_list[2]) os_lib = importlib.import_module(shared_modules_list[3]) # global variables (including imports) that are passed to subprocesses if plot_dir != "": script_globals = {'mpl': mpl, 'np': np, 'sys' : sys_lib, 'os' : os_lib, '__file__': script, 'YODA_USER_PLOT_PATH' : plot_dir} else: script_globals = {'mpl': mpl, 'np': np, 'sys' : sys_lib, 'os' : os_lib, '__file__': script} # execute script and pass it the dict. of global variables exec(open(script).read(), script_globals) else: subprocess.check_call([script + " " + plot_dir], shell=True) except Exception as ex: print("Unexpected error when executing ", script) print(ex)
[docs] def make_mpl_plots(pyScripts,numcores=1): # make the necessary plot directories for script, dir in pyScripts: if len(dir)>0: mkoutdir(dir) cfg.contur_log.info("Executing {} plotting scripts.".format(len(pyScripts))) if cfg.multi_p: # manage matplotlib and numpy processes centrally, # so that they are not imported for each individual script from multiprocessing import Pool, Manager manager = Manager() manager_module = manager.Namespace() manager_module.shared_modules = ["matplotlib", "numpy", "sys", "os"] # Call multiprocessing pool to generate plots p = Pool(processes=numcores) try: # show progress bar only import tqdm jobs = [p.apply_async(func=executeScript, args=(pyScript[0], pyScript[1], True, manager_module.shared_modules, False)) for pyScript in pyScripts] p.close() for job in tqdm.tqdm(jobs): job.get() except ImportError: # execute without progress bar but print path to executed python scripts to screen p.starmap(func=executeScript, iterable=[(pyScript[0], pyScript[1], True, manager_module.shared_modules) for pyScript in pyScripts]) # submit scripts serially else: for script in pyScripts: executeScript(script[0], script[1], False)
[docs] def get_numcores(nreq): from multiprocessing import cpu_count if nreq == 0: try: numcores = cpu_count() except: numcores = 1 # user-specified number else: numcores = nreq return numcores
[docs] def pairwise(iterable): """ Iterates pairwise over a list. Example: s -> (s0, s1), (s2, s3), (s4, s5), .. """ a = iter(iterable) return zip(a, a)