Source code for pyproteome.motifs.motif

'''
This module provides functionality for finding motifs in sequences.

Functionality includes n-mer generation.
'''

from __future__ import division

from collections import defaultdict, Iterable
from copy import copy, deepcopy
from functools import partial
import logging
import multiprocessing
import pickle
import random
import re

import pandas as pd
from scipy.stats import hypergeom


LOGGER = logging.getLogger('pyproteome.motif')


[docs]class Motif: ''' Contains a motif that may match to one or more protein sequences. Matches include the regular single-letter amino acid names as well as phosphosites for serine, threonine, and tyrosine, non-polar amino acids, and positively and negatively charged amino acids. Attributes ---------- motif : str Examples -------- >>> import pyproteome >>> motif = pyproteome.Motif('O..x.-+') >>> 'IEFyFER' in motif True >>> 'IEFyFED' in motif False >>> 'FFFFFFR' in motif False ''' def __init__(self, motif): # Set _motif and _re explicitly, py2.7 does not call motif's setter # if we just set self.motif here... self._motif = motif self._re = self._compile_re(motif) char_mapping = { 'x': 'st', 'O': 'MILV', '-': 'DE', '+': 'RK', '.': 'ACDEFGHIKLMNPQRSTVWYystO-+x', } def __copy__(self): cls = self.__class__ result = cls.__new__(cls) result.__dict__.update(self.__dict__) return result def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if k == '_re': continue setattr(result, k, deepcopy(v, memo)) setattr(result, '_re', result._compile_re(result._motif)) return result @property def motif(self): return self._motif @motif.setter def motif(self, value): self._motif = value self._re = self._compile_re(value) def _compile_re(self, motif): ret = '' for char in motif: if char == '.' or char not in self.char_mapping: ret += char else: ret += '[{}{}]'.format(self.char_mapping[char], char) return re.compile(ret)
[docs] def children(self): for index, x in enumerate(self.motif): if x != '.': continue for char in self.char_mapping.get(x, ''): yield Motif( self.motif[:index] + char + self.motif[index + 1:], )
[docs] def pairwise_children(self, hit_list): # First build a list of all of the AAs in the hits for this motif. We # don't want to generate child motifs that do not at least match any # peptides in that list. aa_is = dict( ( (i, j), set((hit[i], hit[j]) for hit in hit_list[self]), ) for i in range(len(self.motif)) for j in range(i + 1, len(self.motif)) ) for key, val in aa_is.items(): to_add = set() for special_char in 'O-+x': for aa_i, aa_j in val: if aa_i in self.char_mapping[special_char]: to_add.add((special_char, aa_j)) if aa_j in self.char_mapping[special_char]: to_add.add((aa_i, special_char)) if aa_i in self.char_mapping[special_char] and \ aa_j in self.char_mapping[special_char]: to_add.add((special_char, special_char)) val.update(to_add) # Then generate motifs with AAs that match the hit list for i, char_i in enumerate(self.motif): if char_i != '.': continue for j, char_j in enumerate(self.motif[i + 1:], start=i + 1): if char_j != '.': continue for aa_i, aa_j in aa_is[(i, j)]: yield Motif( self.motif[:i] + aa_i + self.motif[i + 1:j] + aa_j + self.motif[j + 1:] )
def __repr__(self): return '<pyproteome.Motif: {}>'.format(self.motif) def __str__(self): return self.motif def __hash__(self): return hash(self.motif) def __contains__(self, other): if isinstance(other, Motif): other = other.motif if not isinstance(other, str): raise TypeError if len(self.motif) != len(other): raise ValueError('Incompatible sequence lengths.') return self.match(other) def __eq__(self, other): if isinstance(other, Motif): return self.motif == other.motif if not isinstance(other, str): raise TypeError if len(self.motif) != len(other): raise ValueError('Incompatible sequence lengths.') return self.match(other) def __lt__(self, other): if not isinstance(other, Motif): raise TypeError return self.motif < other.motif
[docs] def match(self, other): ''' Match a given sequence to this motif. Parameters ---------- other : str Returns ------- bool ''' return bool(self._re.match(other))
[docs]def get_nmer_args(kwargs): ''' Extract all arguments from kwargs that are used by :func:`.generate_n_mers`. Parameters ---------- kwargs : dict Returns ------- dict ''' nmer_args = {} nmer_args['all_matches'] = kwargs.pop('all_matches', False) nmer_args['mods'] = kwargs.pop( 'mods', [(None, 'Phospho')], ) nmer_args['n'] = kwargs.pop( 'n', 15, ) nmer_args['use_ptms'] = kwargs.pop('use_ptms', True) nmer_args['use_nterms'] = kwargs.pop('use_nterms', False) nmer_args['use_cterms'] = kwargs.pop('use_cterms', False) return nmer_args
[docs]def generate_n_mers( sequences, n=15, all_matches=True, fill_left='A', fill_right='A', mods=None, use_ptms=True, use_nterms=False, use_cterms=False, ): ''' Generate n-mers around all sites of modification in sequences. Parameters ---------- sequences : list of :class:`pyproteome.data_sets.sequence.Sequence` n : int, optional all_matches : bool, optional Generate n-mers for all protein matches else just the first match. fill_left : str, optional fill_right : str, optional mods : list of tuple of str, str, optional use_ptms : bool, optional use_nterms : bool, optional use_cterms : bool, optional Returns ------- set of str ''' # Check n is odd assert n % 2 == 1 if not isinstance(sequences, (pd.Series, list, tuple)): sequences = [sequences] def _n_mer_from_sequence(full_seq, abs_pos): return ( fill_left * (n // 2 - abs_pos) + full_seq[max([abs_pos - n // 2, 0]):abs_pos] + full_seq[abs_pos].lower() + full_seq[abs_pos + 1:abs_pos + n // 2 + 1] + fill_right * (abs_pos - len(full_seq) + n // 2 + 1) ) def _get_seqs(seq): for match in seq.protein_matches[:None if all_matches else 1]: if use_ptms: for mod in seq.modifications.get_mods(mods): yield match.rel_pos + mod.rel_pos, match if use_nterms: yield match.rel_pos, match if use_cterms: yield match.rel_pos + len(seq.pep_seq) - 1, match return set( _n_mer_from_sequence( match.protein.full_sequence, pos, ) for seq in sequences for pos, match in _get_seqs(seq) )
def _motif_stats(motif, fore_size, back_size, done, pp_dist=None): fore_hits, back_hits = done[motif] # Re-calculate the P enrichment score associated with this motif p_value = _motif_sig(fore_hits, fore_size, back_hits, back_size) ppvalue = _pp_sig(p_value, pp_dist) if pp_dist else None return ( motif, fore_hits, fore_size, back_hits, back_size, p_value, ppvalue, ) def _is_a_submotif_with_same_size(submotif, fore_hits, done): ''' Test whether the 'submotif' is an ancestor of any known motif, and matches the same number of foreground sequences. ''' # We want to know if this 'submotif' contains any completed motifs return any( checked in submotif for checked, checked_hits in done.items() if fore_hits == checked_hits[0] and checked != submotif ) def _filter_less_specific(prev_pass, done): return set( motif for motif in prev_pass if not any( # other in motif: other is +more+ specific than motif other in motif and done[other][0] >= done[motif][0] for other in prev_pass if other != motif ) ) def _check_anchestry(motif, parent, done): motif_length = len(motif.motif) for index, char in enumerate(motif.motif): if index == motif_length // 2 or char != '.': continue new_motif = Motif( motif.motif[:index] + '.' + motif.motif[index + 1:] ) if new_motif != parent and new_motif in done: return True return False def _count_occurences(motif, parent, hit_list): hits = [ i for i in hit_list[parent] if motif.match(i) ] hit_list[motif] = hits return len(hits) def _motif_sig(fore_hits, fore_size, back_hits, back_size): return ( 1 - hypergeom.cdf(fore_hits, back_size, back_hits, fore_size) + hypergeom.pmf(fore_hits, back_size, back_hits, fore_size) ) def _pp_sig(p_value, pp_dist): return len([i for i in pp_dist if i < p_value]) / len(pp_dist) # Taken from https://stackoverflow.com/questions/1023038/ def _lowpriority(): ''' Set the priority of the process to below-normal.''' import sys try: sys.getwindowsversion() except AttributeError: isWindows = False else: isWindows = True if isWindows: # Based on: # 'Recipe 496767: Set Process Priority In Windows' on ActiveState # http://code.activestate.com/recipes/496767/ import win32api import win32process import win32con pid = win32api.GetCurrentProcessId() handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid) win32process.SetPriorityClass( handle, win32process.BELOW_NORMAL_PRIORITY_CLASS, ) else: import os os.nice(1) def _random_pdist(x, background, fore_size, kwargs): _lowpriority() return motif_enrichment( random.sample(background, fore_size), background, force=True, **kwargs )[1] def _generate_ppdist( background, fore_size, p_iter, cpu_count=None, **kwargs ): if cpu_count is None: try: cpu_count = multiprocessing.cpu_count() - 1 except NotImplementedError: cpu_count = 1 kwargs = kwargs.copy() kwargs['pp_value'] = False pp_dist = [] pool = multiprocessing.Pool( processes=cpu_count, ) LOGGER.info( 'Calculating distribution of {} p-values using {} CPUs'.format( p_iter, cpu_count, ) ) for ind, p_dist in enumerate( pool.imap_unordered( partial( _random_pdist, background=background, fore_size=fore_size, kwargs=kwargs, ), range(p_iter), ), start=1, ): pp_dist.append( min(p_dist + [kwargs.get('sig_cutoff')]) ) if ind % (p_iter // min([p_iter, 10])) == 0: LOGGER.info( 'Calculated {}/{} pvals'.format(ind, p_iter) ) return pp_dist def _make_index(args): def _to_tuple(i): if not isinstance(i, str) and isinstance(i, Iterable): if isinstance(i, list) or isinstance(i, set): i = sorted(i) return tuple(_to_tuple(j) for j in i) return i return _to_tuple(args) def _open_cache(): try: with open('.motif_cache.pickle', 'rb') as f: return pickle.load(f) except (OSError, IOError): return {} except (EOFError, pickle.UnpicklingError, KeyError, TypeError): LOGGER.warning('Unable to open motif cache') return {} def _get_cache(args): return _open_cache().get(_make_index(args), None) def _add_cache(args, ret): LOGGER.info('Adding motifs to cache') cache = _open_cache() cache[_make_index(args)] = ret try: with open('.motif_cache.pickle', 'wb') as f: pickle.dump(cache, f) return ret except OSError: return ret
[docs]def run_motif_enrichment(data, f, **kwargs): ''' Wraps :func:`.motif_enrichment`, generating the list of foreground and background peptide sequences from a data set. Parameters ---------- data : :class:`pyproteome.data_sets.data_set.DataSet` f : dict or list of dict Argument passed to :func:`pyproteome.data_sets.data_set.DataSet.filter`. kwargs : dict Arguments passed to :func:`.motif_enrichment`. Returns ------- df : :class:`pandas.DataFrame` p_dist : list of float pp_dist : list of float ''' nmer_args = get_nmer_args(kwargs) foreground = sorted( generate_n_mers(data.filter(f)['Sequence'], **nmer_args) ) background = sorted( generate_n_mers(data['Sequence'], **nmer_args) ) return motif_enrichment( foreground, background, **kwargs )
[docs]def motif_enrichment( foreground, background, sig_cutoff=0.01, min_fore_hits=0, start_letters=None, pp_value=False, pp_iterations=100, cpu_count=None, force=False, ): ''' Calculate motifs significantly enriched in a set of sequences. Uses a depth-first search algorithm to find discrete motifs that are enriched in a foreground set compared to a given background [1]_. Parameters ---------- foreground : list of str background : list of str sig_cutoff : float, optional min_fore_hits : int, optional start_letters : list of str, optional pp_value : bool, optional pp_iterations : int, optional cpu_count : int, optional Number of CPUs to use when calculating pp-values, does not apply to a single motif-enrichment process. Returns ------- df : :class:`pandas.DataFrame` p_dist : list of float pp_dist : list of float Notes ----- .. [1] Joughin, Brian a et al. 'An Integrated Comparative Phosphoproteomic and Bioinformatic Approach Reveals a Novel Class of MPM-2 Motifs Upregulated in EGFRvIII-Expressing Glioblastoma Cells.' Molecular bioSystems 5.1 (2009): 59-67. ''' p_dist = [] def _search_children(children, parent=None): ''' Run a depth-first search over a given motif. ''' motif_hits = set() for motif in children: if motif in visited: failed['checkpat'] += 1 continue # Mark motif as visited visited.add(motif) if motif in done: failed['done'] += 1 continue # Calculate the number of foreground hits fore_hits = _count_occurences(motif, parent, fg_hit_list) if fore_hits < min_fore_hits: failed['count'] = 1 del fg_hit_list[motif] continue # Check if we've already completed the search for a parent of this # motif if parent and _check_anchestry(motif, parent, done): failed['anchestry'] += 1 del fg_hit_list[motif] continue # Shortcut calculating back-hits if we can help it best_p = _motif_sig(fore_hits, fore_size, fore_hits, back_size) if best_p > sig_cutoff: failed['bestsig'] += 1 del fg_hit_list[motif] continue if _is_a_submotif_with_same_size(motif, fore_hits, done): failed['sub'] += 1 del fg_hit_list[motif] continue # Calculate the number of background hits back_hits = _count_occurences(motif, parent, bg_hit_list) # Check the signifiance of the motif p_value = _motif_sig(fore_hits, fore_size, back_hits, back_size) p_dist.append(p_value) if p_value > sig_cutoff: failed['sig'] += 1 del fg_hit_list[motif] del bg_hit_list[motif] continue # If we get this far, we have a hit! motif_hits.add(motif) failed['succeed'] += 1 done[motif] = (fore_hits, back_hits) # Now try adding to the list of modifications on the motif to find # something more specific. motif_hits.update( _search_children( children=motif.children(), parent=motif, ) ) motif_hits.update( _search_children( children=motif.pairwise_children(fg_hit_list), parent=motif, ) ) del fg_hit_list[motif] del bg_hit_list[motif] return motif_hits fore_size = len(foreground) back_size = len(background) assert back_size > 0 if fore_size == 0: return ( pd.DataFrame( columns=[ 'Motif', 'Foreground Hits', 'Foreground Size', 'Background Hits', 'Background Size', 'p-value', 'pp-value', ], ), [], [], ) fore_size = len(foreground) back_size = len(background) assert back_size > 0 motif_length = len(background[0]) assert motif_length % 2 == 1 assert all(len(i) == motif_length for i in foreground) assert all(len(i) == motif_length for i in background) # Cache motif analyses cache_args = ( foreground, background, sig_cutoff, min_fore_hits, start_letters, pp_value, pp_iterations if pp_value else None, ) if not force: cache = _get_cache(cache_args) if cache is not None: LOGGER.info('Loading motifs from cache') return cache LOGGER.info( 'Starting analysis, n={}, N={}'.format( fore_size, back_size, ) ) LOGGER.info( 'Motif sig cutoff: {}, Minimum hits: {}'.format( sig_cutoff, min_fore_hits, ) ) visited, done = set(), {} fg_hit_list = defaultdict(list) bg_hit_list = defaultdict(list) failed = defaultdict(int) # Set the starting motif and begin adding modifications to it. # Note: Order matters, put less specific to the end of this list if start_letters is None: start_letters = [ # 'k', 's', 't', 'y', 'x', ] starts = [ Motif( '.' * (motif_length // 2) + letter + '.' * (motif_length // 2) ) for letter in start_letters ] LOGGER.info( 'Starting Motifs: {}'.format(', '.join(str(i) for i in starts)) ) for start in starts: fg_hit_list[start] = foreground bg_hit_list[start] = background first_pass = set() try: for start in starts: first_pass.update( _search_children( children=start.children(), parent=start, ) ) first_pass.update( _search_children( children=start.pairwise_children(fg_hit_list), parent=start, ) ) del fg_hit_list[start] del bg_hit_list[start] except KeyboardInterrupt: LOGGER.info('Keyboard interrupted motif search') return finally: # Output some debugging information to compare with Brian's output LOGGER.info( '\n'.join( 'FAIL_{}: {}'.format(key.upper(), failed[key]) for key in [ 'done', 'sub', 'count', 'sig', 'bestsig', 'anchestry', 'checkpat', ] ) + '\nSUCCEED: {}'.format(failed['succeed']) ) LOGGER.info('First pass: {} motifs'.format(len(first_pass))) second_pass = _filter_less_specific(first_pass, done) LOGGER.info( 'Second pass (less specific motifs removed): {} motifs' .format(len(second_pass)) ) pp_dist = [] if len(second_pass) > 0: pp_dist = _generate_ppdist( background, fore_size, pp_iterations, sig_cutoff=sig_cutoff, min_fore_hits=min_fore_hits, start_letters=start_letters, cpu_count=cpu_count, ) if pp_value else None if pp_dist is not None and len(pp_dist) != pp_iterations: LOGGER.info('pp-dist size mismatch') return # Finally prepare the output as a sorted list with the motifs and their # associated statistics. df = pd.DataFrame( data=[ _motif_stats(motif, fore_size, back_size, done, pp_dist=pp_dist) for motif in second_pass ], columns=[ 'Motif', 'Foreground Hits', 'Foreground Size', 'Background Hits', 'Background Size', 'p-value', 'pp-value', ], ) df.sort_values(by=['pp-value', 'p-value', 'Motif'], inplace=True) df.reset_index(drop=True, inplace=True) ret = (df, p_dist, pp_dist) return ret if force else _add_cache(cache_args, ret)