diff options
author | John Glover <glover.john@gmail.com> | 2010-10-18 17:32:05 +0100 |
---|---|---|
committer | John Glover <glover.john@gmail.com> | 2010-10-18 17:32:05 +0100 |
commit | 30755b92afeae5a5a32860b4f4297180f6d3398d (patch) | |
tree | c9332a65adc6a27016678fccee6ce979d87fed07 /mq.py | |
parent | 58a7c36c5a219d8306f276db157097ac30782079 (diff) | |
download | simpl-30755b92afeae5a5a32860b4f4297180f6d3398d.tar.gz simpl-30755b92afeae5a5a32860b4f4297180f6d3398d.tar.bz2 simpl-30755b92afeae5a5a32860b4f4297180f6d3398d.zip |
Moved project over to Git
Diffstat (limited to 'mq.py')
-rw-r--r-- | mq.py | 192 |
1 files changed, 192 insertions, 0 deletions
@@ -0,0 +1,192 @@ +# Copyright (c) 2009 John Glover, National University of Ireland, Maynooth +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import simpl +import numpy as np + +class MQPeakDetection(simpl.PeakDetection): + """Peak detection, based on the McAulay and Quatieri (MQ) algorithm. + + A peak is defined as the point in the spectrum where the slope changes from + position to negative. Hamming window is used, window size must be (at least) + 2.5 times the average pitch. During voiced sections of speech, the window size + is updated every 0.25 secs, to the average pitch. During unvoiced sections, + the window size is fixed at the value of the last voiced frame. Once the width + is specified, the Hamming window is computed and normalised and the STFT is + calculated.""" + def __init__(self): + simpl.PeakDetection.__init__(self) + self._window = simpl.zeros(self._window_size) + self._create_analysis_window() + self._fundamental = float(self._sampling_rate) / self._window_size + + def set_window_size(self, window_size): + self._window_size = window_size + self._fundamental = float(self._sampling_rate) / window_size + + def _create_analysis_window(self): + "Creates the analysis window, a normalised hamming window" + self._window = np.hamming(self._window_size) + s = 0 + for i in range(self._window_size): + s += self._window[i] + self._window /= s + + def find_peaks_in_frame(self, frame): + """Selects the highest peaks from the given spectral frame, up to a maximum of + self._max_peaks peaks.""" + current_peaks = [] + # fft of frame + f = np.fft.rfft(frame * self._window) + spectrum = abs(f) + # find all peaks in the spectrum + prev_mag = np.abs(spectrum[0]) + current_mag = np.abs(spectrum[1]) + next_mag = 0.0 + for bin in range(2, len(spectrum)-1): + next_mag = np.abs(spectrum[bin]) + if (current_mag > prev_mag and + current_mag > next_mag): + p = simpl.Peak() + p.amplitude = current_mag + p.bin_number = bin - 1 + p.frequency = (bin - 1) * self._fundamental + p.phase = np.angle(spectrum[bin-1]) + current_peaks.append(p) + prev_mag = current_mag + current_mag = next_mag + # sort peaks, largest amplitude first, and up to a max of self.num_peaks peaks + current_peaks.sort(cmp=simpl.compare_peak_amps) + current_peaks.reverse() + if len(current_peaks) > self._max_peaks: + current_peaks = current_peaks[0:self._max_peaks] + # put back into ascending frequency order + current_peaks.sort(cmp=simpl.compare_peak_freqs) + return current_peaks + + +class MQPartialTracking(simpl.PartialTracking): + "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm" + def __init__(self): + simpl.PartialTracking.__init__(self) + self._matching_interval = 100 # peak matching interval, in Hz + self._current_frame = None # current frame in the peak tracking algorithm + + def _find_closest_match(self, peak, frame, direction='backwards'): + """Find a candidate match for peak in frame if one exists. This is the closest + (in frequency) match that is within self._matching_interval.""" + free_peaks = [] + for p in frame: + if p.is_free(direction): + free_peaks.append(p) + distances = [abs(peak.frequency - p.frequency) for p in free_peaks] + if len(distances): + min_distance_position = min(xrange(len(distances)), key=distances.__getitem__) + if min(distances) < self._matching_interval: + return free_peaks[min_distance_position] + return None + + def _get_free_peak_below(self, peak, frame, direction='backwards'): + """Returns the closest unmatched peak in frame with a frequency less than peak.frequency.""" + # find peak in frame + for peak_number, p in enumerate(frame): + if p == peak: + # go back through lower peaks (in order) and return the first unmatched + current_peak = peak_number - 1 + while current_peak >= 0: + if frame[current_peak].is_free(direction): + return frame[current_peak] + current_peak -= 1 + return None + return None + + def _kill_track(self, peak, next_frame): + """When a track dies it is matched to itself in the next frame, with 0 amplitude.""" + if peak.is_free(): # this may be a 0 amp (dead) peak from a previous tracking step + next_peak = simpl.Peak() + next_peak.amplitude = 0 + next_peak.frequency = peak.frequency + self.get_partial(peak.partial_id).add_peak(next_peak) + + def update_partials(self, frame, frame_number): + """Streamable (real-time) MQ peak-tracking. + + 1. If there is no peak within the matching interval, the track dies. + If there is at least one peak within the matching interval, the closest match + is declared a candidate match. + + 2. If there is a candidate match from step 1, and it is not a closer match to + any of the remaining unmatched frequencies, it is declared a definitive match. + If the candidate match has a closer unmatched peak in the previous frame, it is + not selected. Instead, the closest lower frequency peak to the candidate is + checked. If it is within the matching interval, it is selected as a definitive + match. If not, the track dies. + In any case, step 1 is repeated on the next unmatched peak. + + 3. Once all peaks from the current frame have been matched, there may still be + peaks remaining in the next frame. A new peak is created in the current frame + at the same frequency and with 0 amplitude, and a match is made.""" + frame_partials = [] + # MQ algorithm needs 2 frames of data, so return if this is the first frame + if not self._current_frame: + self._current_frame = frame + return frame_partials + + for peak in self._current_frame: + if peak.is_start_of_partial(): + partial = simpl.Partial() + partial.starting_frame = frame_number-1 + partial.add_peak(peak) + self.partials.append(partial) + match = self._find_closest_match(peak, frame) + if match: + # is this match closer to any of the other unmatched peaks in frame? + closest_to_candidate = self._find_closest_match(match, self._current_frame, 'forwards') + if not closest_to_candidate == peak: + # see if the closest peak with lower frequency to the candidate is within + # the matching interval + lower_peak = self._get_free_peak_below(match, frame) + if lower_peak: + if abs(lower_peak.frequency - peak.frequency) < self._matching_interval: + # this is the definitive match + self.get_partial(peak.partial_id).add_peak(lower_peak) + else: + self._kill_track(peak, frame) + else: + self._kill_track(peak, frame) + # if not, it is a definitive match + else: + self.get_partial(peak.partial_id).add_peak(match) + else: # no match + self._kill_track(peak, frame) + + # now that all peaks in the current frame have been matched, look for any + # unmatched peaks in the next frame + for p in frame: + if not p.previous_peak: + # create a new track by adding a peak in the current frame, matched to p, + # with amplitude 0 + partial = simpl.Partial() + partial.starting_frame = frame_number-1 + new_peak = simpl.Peak() + new_peak.amplitude = 0 + new_peak.frequency = p.frequency + partial.add_peak(new_peak) + partial.add_peak(p) + self.partials.append(partial) + self._current_frame = frame + return frame_partials + |