summaryrefslogtreecommitdiff
path: root/mq.py
diff options
context:
space:
mode:
authorJohn Glover <glover.john@gmail.com>2011-06-24 18:17:23 +0100
committerJohn Glover <glover.john@gmail.com>2011-06-24 18:17:23 +0100
commit416bd737074a287ea47106c73ea6bcfde40a75a8 (patch)
tree74562303d4f4f2f2e010f7e13cba41dc4852b50c /mq.py
parentd26519464dcbf8c3682348167c29454961facefe (diff)
downloadsimpl-416bd737074a287ea47106c73ea6bcfde40a75a8.tar.gz
simpl-416bd737074a287ea47106c73ea6bcfde40a75a8.tar.bz2
simpl-416bd737074a287ea47106c73ea6bcfde40a75a8.zip
Change to using distutils.
Currently only builds the simplsndobj module
Diffstat (limited to 'mq.py')
-rw-r--r--mq.py375
1 files changed, 0 insertions, 375 deletions
diff --git a/mq.py b/mq.py
deleted file mode 100644
index 340d011..0000000
--- a/mq.py
+++ /dev/null
@@ -1,375 +0,0 @@
-# Copyright (c) 2009 John Glover, National University of Ireland, Maynooth
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-import simpl
-import numpy as np
-import operator as op
-
-def best_match(f, candidates):
- best_diff = 22050.0
- best_freq = 0.0
- pos = 0
- for i, c in enumerate(candidates):
- if abs(f - c) < best_diff:
- best_diff = abs(f - c)
- best_freq = c
- pos = i
- return pos
-
-def TWM(peaks, f_min=0.0, f_max=3000.0, f_step=20.0):
- # TWM parameters
- p = 0.5
- q = 1.4
- r = 0.5
- rho = 0.33
- N = 8
- Err = {}
-
- max_amp = max([x.amplitude for x in peaks])
-
- # remove all peaks with amplitude of less than 10% of max
- # note: this is not in the TWM paper, found that it improved accuracy however
- peaks = [x for x in peaks if x.amplitude >= (max_amp * 0.1)]
-
- # get the max frequency of the remaining peaks
- max_freq = max([x.frequency for x in peaks])
- if max_freq < f_max:
- f_max = max_freq
-
- f_current = f_min
- while f_current < f_max:
- Err_pm = 0.0
- Err_mp = 0.0
- harmonics = np.arange(f_current, f_max, f_current)
- if len(harmonics) > N:
- harmonics = harmonics[0:N]
-
- # calculate mismatch between predicted and actual peaks
- for h in harmonics:
- k = best_match(f_current, [x.frequency for x in peaks])
- f = peaks[k].frequency
- a = peaks[k].amplitude
- Err_pm += abs(h-f) * (h**-p) + (a / max_amp) * ((q * (abs(h-f)) * (h**-p) -r))
-
- # calculate the mismatch between actual and predicted peaks
- for x in peaks:
- k = best_match(x.frequency, harmonics)
- f = harmonics[k]
- xf = x.frequency
- a = x.amplitude # TODO: is this value for 'a' correct?
- Err_mp += abs(xf-f) * (xf**-p) + (a / max_amp) * ((q * (abs(xf-f)) * (xf**-p) -r))
-
- # calculate the total error for f_current as a fundamental frequency
- Err[f_current] = (Err_pm / len(harmonics)) + (rho * Err_mp / len(peaks))
- f_current += f_step
-
- # return the value with the minimum total error
- return min(Err.iteritems(), key=op.itemgetter(1))[0]
-
-
-class MQPeakDetection(simpl.PeakDetection):
- """Peak detection, based on the McAulay and Quatieri (MQ) algorithm.
-
- A peak is defined as the point in the spectrum where the slope changes from
- position to negative. Hamming window is used, window size must be (at least)
- 2.5 times the average pitch. During voiced sections of speech, the window size
- is updated every 0.25 secs, to the average pitch. During unvoiced sections,
- the window size is fixed at the value of the last voiced frame. Once the width
- is specified, the Hamming window is computed and normalised and the STFT is
- calculated."""
- def __init__(self):
- simpl.PeakDetection.__init__(self)
- self._window = simpl.zeros(self._window_size)
- self._create_analysis_window()
- self._fundamental = float(self._sampling_rate) / self._window_size
- self._static_frame_size = False
- self._current_peaks = []
- self._freq_estimates = []
- # no. frames to use to estimate the average pitch (1/4 second window)
- self._avg_freq_frames = int(0.25 * self.sampling_rate / self.frame_size)
-
- def set_frame_size(self, frame_size):
- self._frame_size = frame_size
- self.window_size = frame_size
-
- def set_window_size(self, window_size):
- self._window_size = window_size
- self._fundamental = float(self._sampling_rate) / window_size
- self._create_analysis_window()
-
- def _create_analysis_window(self):
- "Creates the analysis window, a normalised hamming window"
- self._window = np.hamming(self._window_size)
- s = 0
- for i in range(self._window_size):
- s += self._window[i]
- self._window /= s
-
- def get_next_frame_size(self):
- if not len(self._current_peaks):
- return self._frame_size
-
- # frame size must be at least 2.5 times the average pitch period,
- # where the average is taken over 1/4 second.
- # TODO: average should not include frames corresponding to unvoiced speech,
- # ie noisy frames
- self._freq_estimates.append(TWM(self._current_peaks, f_min=self._fundamental,
- f_step=self._fundamental))
- if len(self._freq_estimates) > self._avg_freq_frames:
- self._freq_estimates.pop(0)
-
- avg_freq = sum(self._freq_estimates) / len(self._freq_estimates)
- pitch_period = float(self.sampling_rate) / avg_freq
-
- if self._frame_size < (2.5 * pitch_period):
- return int(2.5 * pitch_period)
- else:
- return self._frame_size
-
- def find_peaks_in_frame(self, frame):
- """Selects the highest peaks from the given spectral frame, up to a maximum of
- self._max_peaks peaks."""
- self._current_peaks = []
- # fft of frame
- f = np.fft.rfft(frame.audio * self._window)
- spectrum = abs(f)
- # find all peaks in the spectrum
- prev_mag = np.abs(spectrum[0])
- current_mag = np.abs(spectrum[1])
- next_mag = 0.0
- for bin in range(2, len(spectrum)-1):
- next_mag = np.abs(spectrum[bin])
- if (current_mag > prev_mag and
- current_mag > next_mag):
- p = simpl.Peak()
- p.amplitude = current_mag
- p.frequency = (bin - 1) * self._fundamental
- p.phase = np.angle(f[bin-1])
- self._current_peaks.append(p)
- prev_mag = current_mag
- current_mag = next_mag
- # sort peaks, largest amplitude first, and up to a max of self.num_peaks peaks
- self._current_peaks.sort(cmp=simpl.compare_peak_amps)
- self._current_peaks.reverse()
- if len(self._current_peaks) > self._max_peaks:
- self._current_peaks = self._current_peaks[0:self._max_peaks]
- # put back into ascending frequency order
- self._current_peaks.sort(cmp=simpl.compare_peak_freqs)
- return self._current_peaks
-
-
-class MQPartialTracking(simpl.PartialTracking):
- "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm"
- def __init__(self):
- simpl.PartialTracking.__init__(self)
- self._matching_interval = 100 # peak matching interval, in Hz
- self._current_frame = None # current frame in the peak tracking algorithm
-
- def _find_closest_match(self, peak, frame, direction='backwards'):
- """Find a candidate match for peak in frame if one exists. This is the closest
- (in frequency) match that is within self._matching_interval."""
- free_peaks = []
- for p in frame:
- if p.is_free(direction):
- free_peaks.append(p)
- distances = [abs(peak.frequency - p.frequency) for p in free_peaks]
- if len(distances):
- min_distance_position = min(xrange(len(distances)), key=distances.__getitem__)
- if min(distances) < self._matching_interval:
- return free_peaks[min_distance_position]
- return None
-
- def _get_free_peak_below(self, peak, frame, direction='backwards'):
- """Returns the closest unmatched peak in frame with a frequency less than peak.frequency."""
- # find peak in frame
- for peak_number, p in enumerate(frame):
- if p == peak:
- # go back through lower peaks (in order) and return the first unmatched
- current_peak = peak_number - 1
- while current_peak >= 0:
- if frame[current_peak].is_free(direction):
- return frame[current_peak]
- current_peak -= 1
- return None
- return None
-
- def _kill_partial(self, partials, peak):
- """When a partial dies it is matched to itself in the next frame, with 0 amplitude."""
- if peak.is_free(): # this may be a 0 amp (dead) peak from a previous tracking step
- next_peak = simpl.Peak()
- next_peak.amplitude = 0.0
- next_peak.frequency = peak.frequency
- self._extend_partial(partials, peak, next_peak)
- #self.get_partial(peak.partial_id).add_peak(next_peak)
-
- def _extend_partial(self, partials, prev_peak, next_peak):
- """Sets next_peak to be the next sinusoidal peak in the partial
- that currently ends with prev_peak."""
- partials[prev_peak.partial_id] = next_peak
- prev_peak.next_peak = next_peak
- next_peak.previous_peak = prev_peak
- next_peak.partial_id = prev_peak.partial_id
-
- def update_partials(self, frame):
- """Streamable (real-time) MQ peak-tracking.
-
- 1. If there is no peak within the matching interval, the track dies.
- If there is at least one peak within the matching interval, the closest match
- is declared a candidate match.
-
- 2. If there is a candidate match from step 1, and it is not a closer match to
- any of the remaining unmatched frequencies, it is declared a definitive match.
- If the candidate match has a closer unmatched peak in the previous frame, it is
- not selected. Instead, the closest lower frequency peak to the candidate is
- checked. If it is within the matching interval, it is selected as a definitive
- match. If not, the track dies.
- In any case, step 1 is repeated on the next unmatched peak.
-
- 3. Once all peaks from the current frame have been matched, there may still be
- peaks remaining in the next frame. A new peak is created in the current frame
- at the same frequency and with 0 amplitude, and a match is made."""
- partials = [None for i in range(self.max_partials)]
- # MQ algorithm needs 2 frames of data, so create new partials and
- # return if this is the first frame
- if not self._current_frame:
- self._current_frame = frame
- # if more peaks than paritals, select the max_partials largest
- # amplitude peaks in frame
- if len(frame.peaks) > self.max_partials:
- frame.peaks.sort(cmp=simpl.compare_peak_amps)
- frame.peaks.reverse()
- partials = frame.peaks[0:self.max_partials]
- # if not, save all peaks as new partials, and add a few zero
- # peaks if necessary
- else:
- partials = frame.peaks
- for i in range(len(frame.peaks), self.max_partials):
- partials.append(simpl.Peak())
- # give the new partials a partial ID
- for i in range(self.max_partials):
- partials[i].partial_id = i
- return partials
-
- for peak in self._current_frame.partials:
- match = self._find_closest_match(peak, frame.peaks)
- if match:
- # is this match closer to any of the other unmatched peaks in frame?
- closest_to_candidate = self._find_closest_match(match, self._current_frame.partials, 'forwards')
- if not closest_to_candidate == peak:
- # see if the closest peak with lower frequency to the candidate is within
- # the matching interval
- lower_peak = self._get_free_peak_below(match, frame.peaks)
- if lower_peak:
- if abs(lower_peak.frequency - peak.frequency) < self._matching_interval:
- # this is the definitive match
- #self.get_partial(peak.partial_id).add_peak(lower_peak)
- self._extend_partial(partials, peak, lower_peak)
- else:
- self._kill_partial(partials, peak)
- else:
- self._kill_partial(partials, peak)
- # if not, it is a definitive match
- else:
- #self.get_partial(peak.partial_id).add_peak(match)
- self._extend_partial(partials, peak, match)
- else: # no match
- self._kill_partial(partials, peak)
-
- # now that all peaks in the current frame have been matched, look for any
- # unmatched peaks in the next frame
- for p in frame.peaks:
- if not p.previous_peak:
- # look for a free partial ID in the current partials
- free_partial = None
- for i in range(self.max_partials):
- if not partials[i]:
- free_partial = i
- break
- if free_partial:
- # create a new track by adding a peak in the current frame,
- # matched to p, with amplitude 0
- new_peak = simpl.Peak()
- new_peak.frequency = p.frequency
- new_peak.partial_id = free_partial
- #partial.add_peak(new_peak)
- #partial.add_peak(p)
- self._extend_partial(partials, new_peak, p)
-
- # add zero peaks for any remaining free partials
- for i in range(self.max_partials):
- if not partials[i]:
- p = simpl.Peak()
- p.partial_id = i
- partials[i] = p
- self._current_frame = frame
- return partials
-
-
-class MQSynthesis(simpl.Synthesis):
- def __init__(self):
- simpl.Synthesis.__init__(self)
- self._current_frame = simpl.zeros(self.frame_size)
- self._previous_partials = [simpl.Peak() for i in range(self.max_partials)]
-
- def hz_to_radians(self, frequency):
- if not frequency:
- return 0.0
- else:
- return (frequency * 2.0 * np.pi) / self.sampling_rate
-
- def synth_frame(self, frame):
- "Synthesises a frame of audio, given a list of peaks from tracks"
- self._current_frame *= 0.0
-
- for n, p in enumerate(frame.partials):
- # get values for last amplitude, frequency and phase
- # these are the initial values of the instantaneous amplitude/frequency/phase
- current_freq = self.hz_to_radians(p.frequency)
- prev_amp = self._previous_partials[n].amplitude
- if prev_amp == 0:
- prev_freq = current_freq
- prev_phase = p.phase - (current_freq * self.frame_size)
- while prev_phase >= np.pi: prev_phase -= 2.0 * np.pi
- while prev_phase < -np.pi: prev_phase += 2.0 * np.pi
- else:
- prev_freq = self.hz_to_radians(self._previous_partials[n].frequency)
- prev_phase = self._previous_partials[n].phase
-
- # amplitudes are linearly interpolated between frames
- inst_amp = prev_amp
- amp_inc = (p.amplitude - prev_amp) / self.frame_size
-
- # freqs/phases are calculated by cubic interpolation
- freq_diff = current_freq - prev_freq
- x = ((prev_phase + (prev_freq * self.frame_size) - p.phase) +
- (freq_diff * (self.frame_size / 2.0)))
- x /= (2.0 * np.pi)
- m = int(np.round(x))
- phase_diff = p.phase - prev_phase - (prev_freq * self.frame_size) + (2.0 * np.pi * m)
- alpha = ((3.0 / (self.frame_size**2)) * phase_diff) - (freq_diff / self.frame_size)
- beta = ((-2.0 / (self.frame_size**3)) * phase_diff) + (freq_diff / (self.frame_size**2))
-
- # calculate output samples
- for i in range(self.frame_size):
- inst_amp += amp_inc
- inst_phase = prev_phase + (prev_freq * i) + (alpha * (i**2)) + (beta * (i**3))
- self._current_frame[i] += (2.0 * inst_amp) * np.cos(inst_phase)
-
- # update previous partials list
- self._previous_partials[n] = p
-
- return self._current_frame
-