diff options
author | John Glover <j@johnglover.net> | 2012-06-28 16:50:58 +0100 |
---|---|---|
committer | John Glover <j@johnglover.net> | 2012-06-28 16:50:58 +0100 |
commit | 1a4d16e36a534f699bba9a42ff4f9d23cda1628d (patch) | |
tree | 213a325318f93b4f32e44e9d468bd70bf57bcdeb | |
parent | 7ad9d0fc803e3a778ec3cdcdc1a0537e6a635a02 (diff) | |
download | simpl-1a4d16e36a534f699bba9a42ff4f9d23cda1628d.tar.gz simpl-1a4d16e36a534f699bba9a42ff4f9d23cda1628d.tar.bz2 simpl-1a4d16e36a534f699bba9a42ff4f9d23cda1628d.zip |
Rename basetypes to pybase, PEP8 cleanup.
-rw-r--r-- | simpl/__init__.py | 48 | ||||
-rw-r--r-- | simpl/audio.py | 10 | ||||
-rw-r--r-- | simpl/examples/plotpeaks.py | 7 | ||||
-rw-r--r-- | simpl/fx.py | 3 | ||||
-rw-r--r-- | simpl/loris.py | 20 | ||||
-rw-r--r-- | simpl/lp.py | 26 | ||||
-rw-r--r-- | simpl/mq.py | 188 | ||||
-rw-r--r-- | simpl/plot/__init__.py | 39 | ||||
-rw-r--r-- | simpl/plot/colours.py | 7 | ||||
-rw-r--r-- | simpl/pybase.py (renamed from simpl/basetypes.py) | 189 | ||||
-rw-r--r-- | simpl/sms.py | 244 | ||||
-rw-r--r-- | simpl/sndobj.py | 93 |
12 files changed, 455 insertions, 419 deletions
diff --git a/simpl/__init__.py b/simpl/__init__.py index 66e5bfe..66ede7c 100644 --- a/simpl/__init__.py +++ b/simpl/__init__.py @@ -1,20 +1,36 @@ -from basetypes import Frame, Peak, Partial -from basetypes import PeakDetection, PartialTracking, Synthesis, Residual -from basetypes import compare_peak_amps, compare_peak_freqs -from sndobj import SndObjPeakDetection, SndObjPartialTracking, SndObjSynthesis -from sms import SMSPeakDetection, SMSPartialTracking, SMSSynthesis, SMSResidual -# from loris import LorisPeakDetection, LorisPartialTracking, LorisSynthesis -from mq import MQPeakDetection, MQPartialTracking, MQSynthesis -from plot import plot_peaks, plot_partials -from audio import read_wav - import numpy as np +dtype = np.double + +import pybase +Frame = pybase.Frame +Peak = pybase.Peak +Partial = pybase.Partial +PeakDetection = pybase.PeakDetection +PartialTracking = pybase.PartialTracking +Synthesis = pybase.Synthesis +Residual = pybase.Residual +compare_peak_amps = pybase.compare_peak_amps +compare_peak_freqs = pybase.compare_peak_freqs + +import sndobj +SndObjPeakDetection = sndobj.SndObjPeakDetection +SndObjPartialTracking = sndobj.SndObjPartialTracking +SndObjSynthesis = sndobj.SndObjSynthesis + +import sms +SMSPeakDetection = sms.SMSPeakDetection +SMSPartialTracking = sms.SMSPartialTracking +SMSSynthesis = sms.SMSSynthesis +SMSResidual = sms.SMSResidual -def array (n, type=float): - return(np.array(n, dtype=type)) +import mq +MQPeakDetection = mq.MQPeakDetection +MQPartialTracking = mq.MQPartialTracking +MQSynthesis = mq.MQSynthesis -def asarray (n, type=float): - return(np.asarray(n, dtype=type)) +import plot +plot_peaks = plot.plot_peaks +plot_partials = plot.plot_partials -def zeros (n, type=float): - return(np.zeros(n, dtype=type)) +import audio +read_wav = audio.read_wav diff --git a/simpl/audio.py b/simpl/audio.py index 99bc439..d86c5ff 100644 --- a/simpl/audio.py +++ b/simpl/audio.py @@ -1,9 +1,9 @@ import numpy as np -from scipy.io.wavfile import read, write +import scipy.io.wavfile as wav import simpl -def read_wav(file): - audio_data = read(file) - # return floating point values between -1 and 1 - return simpl.asarray(audio_data[1]) / 32768.0, audio_data[0] +def read_wav(file): + 'return floating point values between -1 and 1' + audio = wav.read(file) + return np.asarray(audio[1], dtype=simpl.dtype) / 32768.0, audio[0] diff --git a/simpl/examples/plotpeaks.py b/simpl/examples/plotpeaks.py index 67cfb1b..66a538f 100644 --- a/simpl/examples/plotpeaks.py +++ b/simpl/examples/plotpeaks.py @@ -1,18 +1,17 @@ import simpl import matplotlib.pyplot as plt -from scipy.io.wavfile import read input_file = '../../tests/audio/flute.wav' -audio_data = read(input_file) -audio = simpl.asarray(audio_data[1]) / 32768.0 # values between -1 and 1 -sample_rate = audio_data[0] +audio = simpl.read_wav(input_file)[0] # take just the first few frames audio = audio[0:4096] + # peak detection using the SndObj library pd = simpl.SndObjPeakDetection() pd.max_peaks = 20 peaks = pd.find_peaks(audio) + # plot peaks using matplotlib simpl.plot.plot_peaks(peaks) plt.show() diff --git a/simpl/fx.py b/simpl/fx.py index d73d50d..a3ba065 100644 --- a/simpl/fx.py +++ b/simpl/fx.py @@ -1,11 +1,12 @@ from simpl import Partial import numpy as np + def time_stretch(partials, factor): """Time stretch partials by factor.""" stretched_partials = [] step_size = 1.0 / factor - + for partial in partials: stretched_partial = Partial() stretched_partial.starting_frame = partial.starting_frame * factor diff --git a/simpl/loris.py b/simpl/loris.py deleted file mode 100644 index b8156ac..0000000 --- a/simpl/loris.py +++ /dev/null @@ -1,20 +0,0 @@ -import numpy as np -import simpl -from simpl.simplloris import LorisPeakDetection - -# class LorisPeakDetection(simpl.PeakDetection): -# "Sinusoidal peak detection using Loris" -# def __init__(self): -# simpl.PeakDetection.__init__(self) - - -class LorisPartialTracking(simpl.PartialTracking): - "Partial tracking using the algorithm from Loris" - def __init__(self): - simpl.PartialTracking.__init__(self) - - -class LorisSynthesis(simpl.Synthesis): - "Sinusoidal resynthesis using Loris" - def __init__(self, synthesis_type='adsyn'): - simpl.Synthesis.__init__(self) diff --git a/simpl/lp.py b/simpl/lp.py index b458c69..294de93 100644 --- a/simpl/lp.py +++ b/simpl/lp.py @@ -1,6 +1,7 @@ import simpl import numpy as np + def burg(signal, order): """Using the burg method, calculate order coefficients. Returns a numpy array.""" @@ -15,21 +16,22 @@ def burg(signal, order): # fk is f without the first element fk = f[1:] # bk is b without the last element - bk = b[0:b.size-1] + bk = b[0:b.size - 1] # calculate mu - if sum((fk*fk)+(bk*bk)): + if sum((fk * fk) + (bk * bk)): # check for division by zero - mu = -2.0 * sum(fk*bk) / sum((fk*fk)+(bk*bk)) + mu = -2.0 * sum(fk * bk) / sum((fk * fk) + (bk * bk)) else: mu = 0.0 # update coefs # coefs[::-1] just reverses the coefs array - coefs = np.hstack((coefs,0)) + (mu * np.hstack((0, coefs[::-1]))) + coefs = np.hstack((coefs, 0)) + (mu * np.hstack((0, coefs[::-1]))) # update f and b - f = fk + (mu*bk) - b = bk + (mu*fk) + f = fk + (mu * bk) + b = bk + (mu * fk) return coefs[1:] + def predict(signal, coefs, num_predictions): """Using Linear Prediction, return the estimated next num_predictions values of signal, using the given coefficients. @@ -37,34 +39,32 @@ def predict(signal, coefs, num_predictions): predictions = np.zeros(num_predictions) past_samples = np.zeros(len(coefs)) for i in range(len(coefs)): - past_samples[i] = signal[-1-i] + past_samples[i] = signal[-1 - i] sample_pos = 0 for i in range(num_predictions): # each sample in past_samples is multiplied by a coefficient # results are summed for j in range(len(coefs)): - predictions[i] -= coefs[j] * past_samples[(j+sample_pos) % len(coefs)] + predictions[i] -= coefs[j] * past_samples[(j + sample_pos) % len(coefs)] sample_pos -= 1 if sample_pos < 0: sample_pos = len(coefs) - 1 - past_samples[sample_pos] = predictions[i] + past_samples[sample_pos] = predictions[i] return predictions class LPPartialTracking(simpl.PartialTracking): - "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm" + "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm" def __init__(self): simpl.PartialTracking.__init__(self) self._matching_interval = 100 # peak matching interval, in Hz - def update_partials(self, frame, frame_number): """Streamable (real-time) LP peak-tracking. """ frame_partials = [] - + for peak in frame: pass return frame_partials - diff --git a/simpl/mq.py b/simpl/mq.py index 580966a..db3e82d 100644 --- a/simpl/mq.py +++ b/simpl/mq.py @@ -2,17 +2,17 @@ import simpl import numpy as np import operator as op + def best_match(f, candidates): best_diff = 22050.0 - best_freq = 0.0 pos = 0 for i, c in enumerate(candidates): if abs(f - c) < best_diff: best_diff = abs(f - c) - best_freq = c pos = i return pos + def TWM(peaks, f_min=0.0, f_max=3000.0, f_step=20.0): # TWM parameters p = 0.5 @@ -25,7 +25,8 @@ def TWM(peaks, f_min=0.0, f_max=3000.0, f_step=20.0): max_amp = max([x.amplitude for x in peaks]) # remove all peaks with amplitude of less than 10% of max - # note: this is not in the TWM paper, found that it improved accuracy however + # note: this is not in the TWM paper, found that it improved + # accuracy however peaks = [x for x in peaks if x.amplitude >= (max_amp * 0.1)] # get the max frequency of the remaining peaks @@ -46,37 +47,39 @@ def TWM(peaks, f_min=0.0, f_max=3000.0, f_step=20.0): k = best_match(f_current, [x.frequency for x in peaks]) f = peaks[k].frequency a = peaks[k].amplitude - Err_pm += abs(h-f) * (h**-p) + (a / max_amp) * ((q * (abs(h-f)) * (h**-p) -r)) + Err_pm += abs(h - f) * (h ** -p) + (a / max_amp) * ((q * (abs(h - f)) * (h ** -p) - r)) # calculate the mismatch between actual and predicted peaks for x in peaks: k = best_match(x.frequency, harmonics) f = harmonics[k] xf = x.frequency - a = x.amplitude # TODO: is this value for 'a' correct? - Err_mp += abs(xf-f) * (xf**-p) + (a / max_amp) * ((q * (abs(xf-f)) * (xf**-p) -r)) + a = x.amplitude + Err_mp += abs(xf - f) * (xf ** -p) + (a / max_amp) * ((q * (abs(xf - f)) * (xf ** -p) - r)) # calculate the total error for f_current as a fundamental frequency Err[f_current] = (Err_pm / len(harmonics)) + (rho * Err_mp / len(peaks)) f_current += f_step - + # return the value with the minimum total error return min(Err.iteritems(), key=op.itemgetter(1))[0] class MQPeakDetection(simpl.PeakDetection): - """Peak detection, based on the McAulay and Quatieri (MQ) algorithm. - + """ + Peak detection, based on the McAulay and Quatieri (MQ) algorithm. + A peak is defined as the point in the spectrum where the slope changes from - position to negative. Hamming window is used, window size must be (at least) - 2.5 times the average pitch. During voiced sections of speech, the window size - is updated every 0.25 secs, to the average pitch. During unvoiced sections, - the window size is fixed at the value of the last voiced frame. Once the width - is specified, the Hamming window is computed and normalised and the STFT is - calculated.""" + position to negative. Hamming window is used, window size must be + (at least) 2.5 times the average pitch. During voiced sections of speech, + the window size is updated every 0.25 secs, to the average pitch. + During unvoiced sections, the window size is fixed at the value of the + last voiced frame. Once the width is specified, the Hamming window is + computed and normalised and the STFT is calculated. + """ def __init__(self): simpl.PeakDetection.__init__(self) - self._window = simpl.zeros(self._window_size) + self._window = np.zeros(self._window_size, dtype=simpl.dtype) self._create_analysis_window() self._fundamental = float(self._sampling_rate) / self._window_size self._static_frame_size = False @@ -88,12 +91,12 @@ class MQPeakDetection(simpl.PeakDetection): def set_frame_size(self, frame_size): self._frame_size = frame_size self.window_size = frame_size - + def set_window_size(self, window_size): self._window_size = window_size self._fundamental = float(self._sampling_rate) / window_size self._create_analysis_window() - + def _create_analysis_window(self): "Creates the analysis window, a normalised hamming window" self._window = np.hamming(self._window_size) @@ -108,10 +111,12 @@ class MQPeakDetection(simpl.PeakDetection): # frame size must be at least 2.5 times the average pitch period, # where the average is taken over 1/4 second. - # TODO: average should not include frames corresponding to unvoiced speech, - # ie noisy frames - self._freq_estimates.append(TWM(self._current_peaks, f_min=self._fundamental, - f_step=self._fundamental)) + # TODO: average should not include frames corresponding to unvoiced + # speech, ie noisy frames + self._freq_estimates.append( + TWM(self._current_peaks, f_min=self._fundamental, + f_step=self._fundamental) + ) if len(self._freq_estimates) > self._avg_freq_frames: self._freq_estimates.pop(0) @@ -122,11 +127,13 @@ class MQPeakDetection(simpl.PeakDetection): return int(2.5 * pitch_period) else: return self._frame_size - + def find_peaks_in_frame(self, frame): - """Selects the highest peaks from the given spectral frame, up to a maximum of - self._max_peaks peaks.""" - self._current_peaks = [] + """ + Selects the highest peaks from the given spectral frame, up to a + maximum of self._max_peaks peaks. + """ + self._current_peaks = [] # fft of frame f = np.fft.rfft(frame.audio * self._window) spectrum = abs(f) @@ -134,19 +141,22 @@ class MQPeakDetection(simpl.PeakDetection): prev_mag = np.abs(spectrum[0]) current_mag = np.abs(spectrum[1]) next_mag = 0.0 - for bin in range(2, len(spectrum)-1): + + for bin in range(2, len(spectrum) - 1): next_mag = np.abs(spectrum[bin]) - if (current_mag > prev_mag and + if (current_mag > prev_mag and current_mag > next_mag): p = simpl.Peak() p.amplitude = current_mag p.frequency = (bin - 1) * self._fundamental - p.phase = np.angle(f[bin-1]) + p.phase = np.angle(f[bin - 1]) self._current_peaks.append(p) prev_mag = current_mag current_mag = next_mag - # sort peaks, largest amplitude first, and up to a max of self.num_peaks peaks - self._current_peaks.sort(cmp=simpl.compare_peak_amps) + + # sort peaks, largest amplitude first, and up to a max + # of self.num_peaks peaks + self._current_peaks.sort(cmp=simpl.compare_peak_amps) self._current_peaks.reverse() if len(self._current_peaks) > self._max_peaks: self._current_peaks = self._current_peaks[0:self._max_peaks] @@ -156,15 +166,18 @@ class MQPeakDetection(simpl.PeakDetection): class MQPartialTracking(simpl.PartialTracking): - "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm" + "Partial tracking, based on the McAulay and Quatieri (MQ) algorithm" def __init__(self): simpl.PartialTracking.__init__(self) self._matching_interval = 100 # peak matching interval, in Hz - self._current_frame = None # current frame in the peak tracking algorithm + self._current_frame = None # current frame in peak tracking def _find_closest_match(self, peak, frame, direction='backwards'): - """Find a candidate match for peak in frame if one exists. This is the closest - (in frequency) match that is within self._matching_interval.""" + """ + Find a candidate match for peak in frame if one exists. + This is the closest (in frequency) match that is within + self._matching_interval. + """ free_peaks = [] for p in frame: if p.is_free(direction): @@ -175,13 +188,17 @@ class MQPartialTracking(simpl.PartialTracking): if min(distances) < self._matching_interval: return free_peaks[min_distance_position] return None - + def _get_free_peak_below(self, peak, frame, direction='backwards'): - """Returns the closest unmatched peak in frame with a frequency less than peak.frequency.""" + """ + Returns the closest unmatched peak in frame with a frequency less + than peak.frequency. + """ # find peak in frame for peak_number, p in enumerate(frame): if p == peak: - # go back through lower peaks (in order) and return the first unmatched + # go back through lower peaks (in order) and + # return the first unmatched current_peak = peak_number - 1 while current_peak >= 0: if frame[current_peak].is_free(direction): @@ -189,10 +206,15 @@ class MQPartialTracking(simpl.PartialTracking): current_peak -= 1 return None return None - + def _kill_partial(self, partials, peak): - """When a partial dies it is matched to itself in the next frame, with 0 amplitude.""" - if peak.is_free(): # this may be a 0 amp (dead) peak from a previous tracking step + """ + When a partial dies it is matched to itself in the next frame, + with 0 amplitude. + """ + if peak.is_free(): + # check that peak is free as this may be a 0 amp (dead) peak from + # a previous tracking step next_peak = simpl.Peak() next_peak.amplitude = 0.0 next_peak.frequency = peak.frequency @@ -208,32 +230,35 @@ class MQPartialTracking(simpl.PartialTracking): next_peak.partial_id = prev_peak.partial_id def update_partials(self, frame): - """Streamable (real-time) MQ peak-tracking. - + """ + Streamable (real-time) MQ peak-tracking. + 1. If there is no peak within the matching interval, the track dies. - If there is at least one peak within the matching interval, the closest match - is declared a candidate match. - - 2. If there is a candidate match from step 1, and it is not a closer match to - any of the remaining unmatched frequencies, it is declared a definitive match. - If the candidate match has a closer unmatched peak in the previous frame, it is - not selected. Instead, the closest lower frequency peak to the candidate is - checked. If it is within the matching interval, it is selected as a definitive - match. If not, the track dies. - In any case, step 1 is repeated on the next unmatched peak. - - 3. Once all peaks from the current frame have been matched, there may still be - peaks remaining in the next frame. A new peak is created in the current frame - at the same frequency and with 0 amplitude, and a match is made.""" + If there is at least one peak within the matching interval, the closest + match is declared a candidate match. + + 2. If there is a candidate match from step 1, and it is not a closer + match to any of the remaining unmatched frequencies, it is declared a + definitive match. If the candidate match has a closer unmatched peak + in the previous frame, it is not selected. Instead, the closest lower + frequency peak to the candidate is checked. If it is within the + matching interval, it is selected as a definitive match. If not, the + track dies. In any case, step 1 is repeated on the next unmatched peak. + + 3. Once all peaks from the current frame have been matched, there may + still be peaks remaining in the next frame. A new peak is created in + the current frame at the same frequency and with 0 amplitude, and a + match is made. + """ partials = [None for i in range(self.max_partials)] # MQ algorithm needs 2 frames of data, so create new partials and # return if this is the first frame if not self._current_frame: self._current_frame = frame - # if more peaks than paritals, select the max_partials largest + # if more peaks than paritals, select the max_partials largest # amplitude peaks in frame if len(frame.peaks) > self.max_partials: - frame.peaks.sort(cmp=simpl.compare_peak_amps) + frame.peaks.sort(cmp=simpl.compare_peak_amps) frame.peaks.reverse() partials = frame.peaks[0:self.max_partials] # if not, save all peaks as new partials, and add a few zero @@ -246,20 +271,22 @@ class MQPartialTracking(simpl.PartialTracking): for i in range(self.max_partials): partials[i].partial_id = i return partials - + for peak in self._current_frame.partials: match = self._find_closest_match(peak, frame.peaks) if match: - # is this match closer to any of the other unmatched peaks in frame? - closest_to_candidate = self._find_closest_match(match, self._current_frame.partials, 'forwards') + # is this match closer to any of the other unmatched + # peaks in frame? + closest_to_candidate = self._find_closest_match( + match, self._current_frame.partials, 'forwards' + ) if not closest_to_candidate == peak: - # see if the closest peak with lower frequency to the candidate is within - # the matching interval + # see if the closest peak with lower frequency to + # the candidate is within the matching interval lower_peak = self._get_free_peak_below(match, frame.peaks) if lower_peak: if abs(lower_peak.frequency - peak.frequency) < self._matching_interval: # this is the definitive match - #self.get_partial(peak.partial_id).add_peak(lower_peak) self._extend_partial(partials, peak, lower_peak) else: self._kill_partial(partials, peak) @@ -267,13 +294,12 @@ class MQPartialTracking(simpl.PartialTracking): self._kill_partial(partials, peak) # if not, it is a definitive match else: - #self.get_partial(peak.partial_id).add_peak(match) self._extend_partial(partials, peak, match) else: # no match self._kill_partial(partials, peak) - - # now that all peaks in the current frame have been matched, look for any - # unmatched peaks in the next frame + + # now that all peaks in the current frame have been matched, + # look for any unmatched peaks in the next frame for p in frame.peaks: if not p.previous_peak: # look for a free partial ID in the current partials @@ -283,13 +309,11 @@ class MQPartialTracking(simpl.PartialTracking): free_partial = i break if free_partial: - # create a new track by adding a peak in the current frame, + # create a new track by adding a peak in the current frame, # matched to p, with amplitude 0 new_peak = simpl.Peak() new_peak.frequency = p.frequency new_peak.partial_id = free_partial - #partial.add_peak(new_peak) - #partial.add_peak(p) self._extend_partial(partials, new_peak, p) # add zero peaks for any remaining free partials @@ -298,14 +322,14 @@ class MQPartialTracking(simpl.PartialTracking): p = simpl.Peak() p.partial_id = i partials[i] = p - self._current_frame = frame + self._current_frame = frame return partials class MQSynthesis(simpl.Synthesis): def __init__(self): simpl.Synthesis.__init__(self) - self._current_frame = simpl.zeros(self.frame_size) + self._current_frame = np.zeros(self.frame_size, dtype=simpl.dtype) self._previous_partials = [simpl.Peak() for i in range(self.max_partials)] def hz_to_radians(self, frequency): @@ -313,21 +337,24 @@ class MQSynthesis(simpl.Synthesis): return 0.0 else: return (frequency * 2.0 * np.pi) / self.sampling_rate - + def synth_frame(self, frame): "Synthesises a frame of audio, given a list of peaks from tracks" self._current_frame *= 0.0 for n, p in enumerate(frame.partials): # get values for last amplitude, frequency and phase - # these are the initial values of the instantaneous amplitude/frequency/phase + # these are the initial values of the instantaneous + # amplitude/frequency/phase current_freq = self.hz_to_radians(p.frequency) prev_amp = self._previous_partials[n].amplitude if prev_amp == 0: - prev_freq = current_freq + prev_freq = current_freq prev_phase = p.phase - (current_freq * self.frame_size) - while prev_phase >= np.pi: prev_phase -= 2.0 * np.pi - while prev_phase < -np.pi: prev_phase += 2.0 * np.pi + while prev_phase >= np.pi: + prev_phase -= 2.0 * np.pi + while prev_phase < -np.pi: + prev_phase += 2.0 * np.pi else: prev_freq = self.hz_to_radians(self._previous_partials[n].frequency) prev_phase = self._previous_partials[n].phase @@ -349,11 +376,10 @@ class MQSynthesis(simpl.Synthesis): # calculate output samples for i in range(self.frame_size): inst_amp += amp_inc - inst_phase = prev_phase + (prev_freq * i) + (alpha * (i**2)) + (beta * (i**3)) + inst_phase = prev_phase + (prev_freq * i) + (alpha * (i ** 2)) + (beta * (i ** 3)) self._current_frame[i] += (2.0 * inst_amp) * np.cos(inst_phase) # update previous partials list self._previous_partials[n] = p return self._current_frame - diff --git a/simpl/plot/__init__.py b/simpl/plot/__init__.py index d328886..a7aad8e 100644 --- a/simpl/plot/__init__.py +++ b/simpl/plot/__init__.py @@ -2,6 +2,7 @@ import simpl import matplotlib.pyplot as plt import colours + def plot_frame_peaks(peaks): "Plot peaks in one frame" x_values = [] @@ -11,13 +12,15 @@ def plot_frame_peaks(peaks): y_values.append(peak.amplitude) plt.plot(x_values, y_values, 'ro') + def _plot_frame_peaks(peaks, frame_number, max_amp=0): "Plot one frame, which is a list of Peak objects" for peak in peaks: plt.plot(frame_number, int(peak.frequency), linestyle="None", marker="o", markersize=2, markeredgewidth=None, - markerfacecolor=colours.pbj(peak.amplitude/max_amp)) - + markerfacecolor=colours.pbj(peak.amplitude / max_amp)) + + def plot_peaks(frames): "Plot peaks found by a peak detection algorithm" # Get the maximum peak amplitude, used to select an appropriate @@ -33,7 +36,8 @@ def plot_peaks(frames): for frame_number, frame in enumerate(frames): _plot_frame_peaks(frame.peaks, frame_number, max_amp) - + + def plot_partials(frames, show_peaks=False): "Plot partials created by a partial tracking algorithm" # Get the maximum peak amplitude, used to select an appropriate @@ -70,22 +74,21 @@ def plot_partials(frames, show_peaks=False): for p in live_partials: if p: partials.append(p) - + peaks = [[] for f in range(num_frames)] for partial in partials: - x_values = [] - y_values = [] - avg_amp = 0.0 - num_peaks = 0 - for peak_number, peak in enumerate(partial.peaks): - x_values.append(partial.starting_frame + peak_number) - y_values.append(int(peak.frequency)) - avg_amp += peak.amplitude - num_peaks += 1 - peaks[partial.starting_frame + peak_number].append(peak) - avg_amp /= num_peaks - plt.plot(x_values, y_values, color=colours.pbj(avg_amp/max_amp)) + x_values = [] + y_values = [] + avg_amp = 0.0 + num_peaks = 0 + for peak_number, peak in enumerate(partial.peaks): + x_values.append(partial.starting_frame + peak_number) + y_values.append(int(peak.frequency)) + avg_amp += peak.amplitude + num_peaks += 1 + peaks[partial.starting_frame + peak_number].append(peak) + avg_amp /= num_peaks + plt.plot(x_values, y_values, color=colours.pbj(avg_amp / max_amp)) if show_peaks: - plot_peaks(frames) - + plot_peaks(frames) diff --git a/simpl/plot/colours.py b/simpl/plot/colours.py index e184341..5efc86b 100644 --- a/simpl/plot/colours.py +++ b/simpl/plot/colours.py @@ -207,13 +207,15 @@ pbj_colours = [ [0.820, 0.357, 0.094] ] + def classic(heat): if heat < 0.0: return classic_colours[0] elif heat > 1.0: return classic_colours[-1] else: - return classic_colours[int(heat * (len(classic_colours)-1))] + return classic_colours[int(heat * (len(classic_colours) - 1))] + def pbj(heat): if heat < 0.0: @@ -221,5 +223,4 @@ def pbj(heat): elif heat > 1.0: return pbj_colours[-1] else: - return pbj_colours[int(heat * (len(pbj_colours)-1))] - + return pbj_colours[int(heat * (len(pbj_colours) - 1))] diff --git a/simpl/basetypes.py b/simpl/pybase.py index 182be21..0928087 100644 --- a/simpl/basetypes.py +++ b/simpl/pybase.py @@ -1,6 +1,7 @@ import simpl import numpy as np + class Peak(object): "A spectral peak" def __init__(self): @@ -12,12 +13,15 @@ class Peak(object): self.partial_id = None self.partial_position = None self.frame_number = None - + def is_start_of_partial(self): return self.previous_peak is None - + def is_free(self, direction='forwards'): - "Returns true iff this peak is unmatched in the given direction, and has positive amplitude" + """ + Returns true iff this peak is unmatched in the given direction, + and has positive amplitude. + """ if self.amplitude <= 0: return False if direction == 'forwards': @@ -29,20 +33,23 @@ class Peak(object): else: return False return True - - + + def compare_peak_amps(peak_x, peak_y): - """Compares two peaks, and returns 1, 0 or -1 if the first has a greater + """ + Compares two peaks, and returns 1, 0 or -1 if the first has a greater amplitude than the second, they have the same amplitude, or the second has a greater amplitude than the first respectively. - Can be used to sort lists of peaks.""" + Can be used to sort lists of peaks. + """ if peak_x.amplitude > peak_y.amplitude: return 1 elif peak_x.amplitude < peak_y.amplitude: return -1 else: return 0 - + + def compare_peak_freqs(peak_x, peak_y): """Compares two peaks, and returns 1, 0 or -1 if the first has a greater frequency than the second, they have the same frequency, or the second has @@ -57,9 +64,9 @@ def compare_peak_freqs(peak_x, peak_y): class Partial(object): - "Represents a sinuoidal partial or track, an ordered sequence of Peaks" + "Represents a sinuoidal partial or track, an ordered sequence of Peaks" _num_partials = 0 - + def __init__(self): "Initialise peaks list and increment partial_id" self.peaks = [] @@ -67,7 +74,7 @@ class Partial(object): self.partial_number = -1 self.partial_id = Partial._num_partials Partial._num_partials += 1 - + def add_peak(self, peak): "Add peak to this partial, setting its id and partial_id." partial_position = len(self.peaks) @@ -79,21 +86,21 @@ class Partial(object): peak.partial_position = partial_position peak.partial_id = self.partial_id peak.partial_number = self.partial_number - + def get_length(self): "Return the length of this partial (as a number of frames)" return len(self.peaks) - + def get_last_frame(self): "Return the frame number of the last frame in this partial" return self.starting_frame + self.get_length() - + def get_last_peak(self): "Return the last peak of this partial" if self.peaks: return self.peaks[-1] return None - + def list_peaks(self): "A generator that returns the peaks in this partial" for peak in self.peaks: @@ -101,14 +108,16 @@ class Partial(object): class Frame(object): - """Represents a frame of audio information. - This can be: - raw audio samples - - an unordered list of sinusoidal peaks + """ + Represents a frame of audio information. + This can be: - raw audio samples + - an unordered list of sinusoidal peaks - an ordered list of partials - synthesised audio samples - residual samples - - synthesised residual samples""" - + - synthesised residual samples + """ + def __init__(self): self._size = 512 self._max_partials = 100 @@ -122,7 +131,7 @@ class Frame(object): class PeakDetection(object): "Detect spectral peaks" - + def __init__(self): self._sampling_rate = 44100 self._frame_size = 2048 @@ -131,9 +140,9 @@ class PeakDetection(object): self._max_peaks = 100 self._window_type = "hamming" self._window_size = 2048 - self._min_peak_separation = 1.0 # in Hz + self._min_peak_separation = 1.0 # in Hz self.frames = [] - + # properties sampling_rate = property(lambda self: self.get_sampling_rate(), lambda self, x: self.set_sampling_rate(x)) @@ -147,43 +156,43 @@ class PeakDetection(object): lambda self, x: self.set_window_type(x)) window_size = property(lambda self: self.get_window_size(), lambda self, x: self.set_window_size(x)) - + def get_sampling_rate(self): return self._sampling_rate - + def set_sampling_rate(self, sampling_rate): self._sampling_rate = sampling_rate - + def get_frame_size(self): return self._frame_size - + def set_frame_size(self, frame_size): self._frame_size = frame_size - + def get_hop_size(self): return self._hop_size - + def set_hop_size(self, hop_size): self._hop_size = hop_size - + def get_max_peaks(self): return self._max_peaks - + def set_max_peaks(self, max_peaks): self._max_peaks = max_peaks - + def get_window_type(self): return self._window_type - + def set_window_type(self, window_type): self._window_type = window_type - + def get_window_size(self): return self._window_size - + def set_window_size(self, window_size): self._window_size = window_size - + def get_next_frame_size(self): return self._frame_size @@ -191,11 +200,14 @@ class PeakDetection(object): "Find and return all spectral peaks in a given frame of audio" peaks = [] return peaks - + def find_peaks(self, audio): - """Find and return all spectral peaks in a given audio signal. - If the signal contains more than 1 frame worth of audio, it will be broken - up into separate frames, with a list of peaks returned for each frame.""" + """ + Find and return all spectral peaks in a given audio signal. + If the signal contains more than 1 frame worth of audio, + it will be broken up into separate frames, with a list of peaks + returned for each frame. + """ self.frames = [] pos = 0 while pos < len(audio): @@ -205,17 +217,19 @@ class PeakDetection(object): # get the next frame frame = Frame() frame.size = self.frame_size - frame.audio = audio[pos:pos+self.frame_size] + frame.audio = audio[pos:pos + self.frame_size] # pad if necessary if len(frame.audio) < self.frame_size: - frame.audio = np.hstack((frame.audio, - simpl.zeros(self.frame_size - len(frame.audio)))) + frame.audio = np.hstack(( + frame.audio, np.zeros(self.frame_size - len(frame.audio), + dtype=simpl.dtype) + )) # find peaks frame.peaks = self.find_peaks_in_frame(frame) self.frames.append(frame) pos += self.hop_size return self.frames - + class PartialTracking(object): "Link spectral peaks from consecutive frames to form partials" @@ -225,7 +239,7 @@ class PartialTracking(object): self._min_partial_length = 0 self._max_gap = 2 self.frames = [] - + # properties sampling_rate = property(lambda self: self.get_sampling_rate(), lambda self, x: self.set_sampling_rate(x)) @@ -235,36 +249,36 @@ class PartialTracking(object): lambda self, x: self.set_min_partial_length(x)) max_gap = property(lambda self: self.get_max_gap(), lambda self, x: self.set_max_gap(x)) - + def get_sampling_rate(self): return self._sampling_rate - + def set_sampling_rate(self, sampling_rate): self._sampling_rate = sampling_rate - + def get_max_partials(self): return self._max_partials - + def set_max_partials(self, num_partials): self._max_partials = num_partials - + def get_min_partial_length(self): return self._min_partial_length - + def set_min_partial_length(self, length): self._min_partial_length = length - + def get_max_gap(self): return self._max_gap - + def set_max_gap(self, gap): self._max_gap = gap - + def update_partials(self, frame): "Streamable (real-time) partial-tracking." peaks = [None for i in range(self.max_partials)] return peaks - + def find_partials(self, frames): """Find partials from the sinusoidal peaks in a list of Frames""" self.frames = [] @@ -273,7 +287,7 @@ class PartialTracking(object): self.frames.append(frame) return self.frames - + class Synthesis(object): "Synthesise audio from spectral analysis data" def __init__(self): @@ -281,7 +295,7 @@ class Synthesis(object): self._hop_size = 512 self._max_partials = 100 self._sampling_rate = 44100 - + # properties frame_size = property(lambda self: self.get_frame_size(), lambda self, x: self.set_frame_size(x)) @@ -293,46 +307,46 @@ class Synthesis(object): lambda self, x: self.set_max_partials(x)) sampling_rate = property(lambda self: self.get_sampling_rate(), lambda self, x: self.set_sampling_rate(x)) - + def get_frame_size(self): return self._frame_size - + def set_frame_size(self, frame_size): self._frame_size = frame_size def get_hop_size(self): return self._hop_size - + def set_hop_size(self, hop_size): self._hop_size = hop_size - + def get_max_partials(self): return self._max_partials - + def set_max_partials(self, num_partials): self._max_partials = num_partials - + def get_sampling_rate(self): return self._sampling_rate - + def set_sampling_rate(self, sampling_rate): self._sampling_rate = sampling_rate def synth_frame(self, frame): "Synthesises a frame of audio, given a list of peaks from tracks" raise Exception("NotYetImplemented") - + def synth(self, frames): "Synthesise audio from the given partials" - audio_out = simpl.array([]) + audio_out = np.array([], dtype=simpl.dtype) for frame in frames: audio_out = np.hstack((audio_out, self.synth_frame(frame))) return audio_out - - + + class Residual(object): "Calculate a residual signal" - + def __init__(self): self._hop_size = 512 self._frame_size = 512 @@ -344,36 +358,30 @@ class Residual(object): def get_frame_size(self): return self._frame_size - + def set_frame_size(self, frame_size): self._frame_size = frame_size def get_hop_size(self): return self._hop_size - + def set_hop_size(self, hop_size): self._hop_size = hop_size def residual_frame(self, synth, original): "Computes the residual signal for a frame of audio" raise Exception("NotYetImplemented") - + def find_residual(self, synth, original): "Calculate and return the residual signal" - # pad the signals if necessary - if len(synth) % self.hop_size != 0: - synth = np.hstack((synth, np.zeros(self.hop_size - (len(synth) % self.hop_size)))) - if len(original) % self.hop_size != 0: - original = np.hstack((original, np.zeros(self.hop_size - (len(original) % self.hop_size)))) - num_frames = len(original) / self.hop_size - residual = simpl.array([]) + residual = np.array([], dtype=simpl.dtype) sample_offset = 0 for i in range(num_frames): - synth_frame = synth[sample_offset:sample_offset+self.hop_size] - original_frame = original[sample_offset:sample_offset+self.hop_size] - residual = np.hstack((residual, + synth_frame = synth[sample_offset:sample_offset + self.hop_size] + original_frame = original[sample_offset:sample_offset + self.hop_size] + residual = np.hstack((residual, self.residual_frame(synth_frame, original_frame))) sample_offset += self.hop_size return residual @@ -381,24 +389,17 @@ class Residual(object): def synth_frame(self, synth, original): "Calculate and return one frame of the synthesised residual signal" raise Exception("NotYetImplemented") - + def synth(self, synth, original): "Calculate and return a synthesised residual signal" - # pad the signals if necessary - if len(synth) % self.hop_size != 0: - synth = np.hstack((synth, np.zeros(self.hop_size - (len(synth) % self.hop_size)))) - if len(original) % self.hop_size != 0: - original = np.hstack((original, np.zeros(self.hop_size - (len(original) % self.hop_size)))) - num_frames = len(original) / self.hop_size - residual = simpl.array([]) + residual = np.array([], dtype=simpl.dtype) sample_offset = 0 for i in range(num_frames): - synth_frame = synth[sample_offset:sample_offset+self.hop_size] - original_frame = original[sample_offset:sample_offset+self.hop_size] - residual = np.hstack((residual, + synth_frame = synth[sample_offset:sample_offset + self.hop_size] + original_frame = original[sample_offset:sample_offset + self.hop_size] + residual = np.hstack((residual, self.synth_frame(synth_frame, original_frame))) sample_offset += self.hop_size return residual - diff --git a/simpl/sms.py b/simpl/sms.py index 37d8f81..e683ac2 100644 --- a/simpl/sms.py +++ b/simpl/sms.py @@ -1,9 +1,11 @@ +import numpy as np import simpl -from simpl import simplsms +import simpl.simplsms as simplsms + class SMSPeakDetection(simpl.PeakDetection): "Sinusoidal peak detection using SMS" - + def __init__(self): simpl.PeakDetection.__init__(self) simplsms.sms_init() @@ -27,17 +29,18 @@ class SMSPeakDetection(simpl.PeakDetection): if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") self._peaks = simplsms.SMS_SpectralPeaks(self._max_peaks) - # By default, SMS will change the size of the frames being read depending on the - # detected fundamental frequency (if any) of the input sound. To prevent this - # behaviour (useful when comparing different analysis algorithms), set the + # By default, SMS will change the size of the frames being read + # depending on the detected fundamental frequency (if any) of the + # input sound. To prevent this behaviour (useful when comparing + # different analysis algorithms), set the # _static_frame_size variable to True self._static_frame_size = False - + def __del__(self): simplsms.sms_freeAnalysis(self._analysis_params) simplsms.sms_freeSpectralPeaks(self._peaks) simplsms.sms_free() - + # properties max_frequency = property(lambda self: self.get_max_frequency(), lambda self, x: self.set_max_frequency(x)) @@ -59,62 +62,62 @@ class SMSPeakDetection(simpl.PeakDetection): lambda self, x: self.set_format(x)) pre_emphasis = property(lambda self: self.get_pre_emphasis(), lambda self, x: self.set_pre_emphasis(x)) - + def get_max_frequency(self): return self._analysis_params.fHighestFreq - + def set_max_frequency(self, max_frequency): self._analysis_params.fHighestFreq = max_frequency - + def get_default_fundamental(self): return self._analysis_params.fDefaultFundamental - + def set_default_fundamental(self, default_fundamental): self._analysis_params.fDefaultFundamental = default_fundamental - + def get_max_frame_delay(self): return self._analysis_params.iMaxDelayFrames - + def set_max_frame_delay(self, max_frame_delay): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iMaxDelayFrames = max_frame_delay if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") - + def get_analysis_delay(self): return self._analysis_params.analDelay - + def set_analysis_delay(self, analysis_delay): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.analDelay = analysis_delay if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") - + def get_min_good_frames(self): return self._analysis_params.minGoodFrames - + def set_min_good_frames(self, min_good_frames): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.minGoodFrames = min_good_frames if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") - + def get_min_frequency(self): return self._analysis_params.fLowestFundamental - + def set_min_frequency(self, min_frequency): self._analysis_params.fLowestFundamental = min_frequency self._analysis_params.fLowestFreq = min_frequency - + def get_min_peak_amp(self): return self._analysis_params.fMinPeakMag - + def set_min_peak_amp(self, min_peak_amp): self._analysis_params.fMinPeakMag = min_peak_amp def get_clean_tracks(self): return self._analysis_params.iCleanTracks - + def set_clean_tracks(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iCleanTracks = x @@ -123,7 +126,7 @@ class SMSPeakDetection(simpl.PeakDetection): def get_format(self): return self._analysis_params.iFormat - + def set_format(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iFormat = x @@ -138,10 +141,10 @@ class SMSPeakDetection(simpl.PeakDetection): self._analysis_params.preEmphasis = x if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") - + def get_hop_size(self): return self._analysis_params.sizeHop - + def set_hop_size(self, hop_size): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iFrameRate = self.sampling_rate / hop_size @@ -150,15 +153,15 @@ class SMSPeakDetection(simpl.PeakDetection): def get_max_peaks(self): return self._analysis_params.maxPeaks - + def set_max_peaks(self, max_peaks): simplsms.sms_freeAnalysis(self._analysis_params) simplsms.sms_freeSpectralPeaks(self._peaks) # make sure the new max is less than SMS_MAX_NPEAKS if max_peaks > simplsms.SMS_MAX_NPEAKS: print "Warning: max peaks (" + str(max_peaks) + ")", - print "set to more than the maximum number of peaks possible in libsms." - print " Setting to", simplsms.SMS_MAX_NPEAKS, "instead." + print "set to more than the max. no. peaks possible in libsms." + print " Setting to", simplsms.SMS_MAX_NPEAKS, "instead." max_peaks = simplsms.SMS_MAX_NPEAKS # set analysis params self._max_peaks = max_peaks @@ -172,30 +175,30 @@ class SMSPeakDetection(simpl.PeakDetection): def get_sampling_rate(self): return self._analysis_params.iSamplingRate - + def set_sampling_rate(self, sampling_rate): self._analysis_params.iSamplingRate = sampling_rate simplsms.sms_freeAnalysis(self._analysis_params) if simplsms.sms_initAnalysis(self._analysis_params) != 0: raise Exception("Error allocating memory for analysis_params") - + def set_window_size(self, window_size): self._window_size = window_size self._analysis_params.iDefaultSizeWindow = window_size def get_next_frame_size(self): return self._analysis_params.sizeNextRead - + def find_peaks_in_frame(self, frame): "Find and return all spectral peaks in a given frame of audio" current_peaks = [] - num_peaks = simplsms.sms_findPeaks(frame.audio, - self._analysis_params, + num_peaks = simplsms.sms_findPeaks(frame.audio, + self._analysis_params, self._peaks) if num_peaks > 0: - amps = simpl.zeros(num_peaks) - freqs = simpl.zeros(num_peaks) - phases = simpl.zeros(num_peaks) + amps = np.zeros(num_peaks, dtype=simpl.dtype) + freqs = np.zeros(num_peaks, dtype=simpl.dtype) + phases = np.zeros(num_peaks, dtype=simpl.dtype) self._peaks.getFreq(freqs) self._peaks.getMag(amps) self._peaks.getPhase(phases) @@ -206,11 +209,14 @@ class SMSPeakDetection(simpl.PeakDetection): p.phase = phases[i] current_peaks.append(p) return current_peaks - + def find_peaks(self, audio): - """Find and return all spectral peaks in a given audio signal. - If the signal contains more than 1 frame worth of audio, it will be broken - up into separate frames, with a list of peaks returned for each frame.""" + """ + Find and return all spectral peaks in a given audio signal. + If the signal contains more than 1 frame worth of audio, + it will be broken up into separate frames, with a list of + peaks returned for each frame. + """ # TODO: This hops by frame size rather than hop size in order to # make sure the results are the same as with libsms. Make sure # we have the same number of frames as the other algorithms. @@ -219,7 +225,7 @@ class SMSPeakDetection(simpl.PeakDetection): pos = 0 # account for SMS analysis delay # need an extra (max_frame_delay - 1) frames - num_samples = (len(audio) - self.hop_size) + ((self.max_frame_delay -1) * self.hop_size) + num_samples = (len(audio) - self.hop_size) + ((self.max_frame_delay - 1) * self.hop_size) while pos < num_samples: # get the next frame size if not self._static_frame_size: @@ -227,17 +233,17 @@ class SMSPeakDetection(simpl.PeakDetection): # get the next frame frame = simpl.Frame() frame.size = self.frame_size - frame.audio = audio[pos:pos+self.frame_size] + frame.audio = audio[pos:pos + self.frame_size] # find peaks frame.peaks = self.find_peaks_in_frame(frame) self.frames.append(frame) pos += self.frame_size return self.frames - + class SMSPartialTracking(simpl.PartialTracking): "Partial tracking using SMS" - + def __init__(self): simpl.PartialTracking.__init__(self) simplsms.sms_init() @@ -245,7 +251,7 @@ class SMSPartialTracking(simpl.PartialTracking): simplsms.sms_initAnalParams(self._analysis_params) self._analysis_params.iSamplingRate = self.sampling_rate self._analysis_params.fHighestFreq = 20000 - self._analysis_params.iMaxDelayFrames = 4 # minimum frame delay with libsms + self._analysis_params.iMaxDelayFrames = 4 # libsms minimum self._analysis_params.analDelay = 0 self._analysis_params.minGoodFrames = 1 self._analysis_params.iCleanTracks = 0 @@ -259,7 +265,7 @@ class SMSPartialTracking(simpl.PartialTracking): simplsms.sms_fillHeader(self._sms_header, self._analysis_params, "simpl") self._analysis_frame = simplsms.SMS_Data() simplsms.sms_allocFrameH(self._sms_header, self._analysis_frame) - + def __del__(self): simplsms.sms_freeAnalysis(self._analysis_params) simplsms.sms_freeFrame(self._analysis_frame) @@ -285,19 +291,19 @@ class SMSPartialTracking(simpl.PartialTracking): def get_max_frequency(self): return self._analysis_params.fHighestFreq - + def set_max_frequency(self, max_frequency): self._analysis_params.fHighestFreq = max_frequency - + def get_default_fundamental(self): return self._analysis_params.fDefaultFundamental - + def set_default_fundamental(self, default_fundamental): self._analysis_params.fDefaultFundamental = default_fundamental - + def get_max_frame_delay(self): return self._analysis_params.iMaxDelayFrames - + def set_max_frame_delay(self, max_frame_delay): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iMaxDelayFrames = max_frame_delay @@ -306,7 +312,7 @@ class SMSPartialTracking(simpl.PartialTracking): def get_analysis_delay(self): return self._analysis_params.analDelay - + def set_analysis_delay(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.analDelay = x @@ -315,7 +321,7 @@ class SMSPartialTracking(simpl.PartialTracking): def get_min_good_frames(self): return self._analysis_params.minGoodFrames - + def set_min_good_frames(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.minGoodFrames = x @@ -324,7 +330,7 @@ class SMSPartialTracking(simpl.PartialTracking): def get_clean_tracks(self): return self._analysis_params.iCleanTracks - + def set_clean_tracks(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iCleanTracks = x @@ -333,7 +339,7 @@ class SMSPartialTracking(simpl.PartialTracking): def get_format(self): return self._analysis_params.iFormat - + def set_format(self, x): simplsms.sms_freeAnalysis(self._analysis_params) self._analysis_params.iFormat = x @@ -351,7 +357,7 @@ class SMSPartialTracking(simpl.PartialTracking): def get_max_partials(self): return self._analysis_params.nTracks - + def set_max_partials(self, max_partials): simplsms.sms_freeAnalysis(self._analysis_params) simplsms.sms_freeFrame(self._analysis_frame) @@ -363,14 +369,14 @@ class SMSPartialTracking(simpl.PartialTracking): raise Exception("Error allocating memory for analysis_params") simplsms.sms_fillHeader(self._sms_header, self._analysis_params, "simpl") simplsms.sms_allocFrameH(self._sms_header, self._analysis_frame) - + def update_partials(self, frame): "Streamable (real-time) partial-tracking." # load Peak amplitudes, frequencies and phases into arrays num_peaks = len(frame.peaks) - amps = simpl.zeros(num_peaks) - freqs = simpl.zeros(num_peaks) - phases = simpl.zeros(num_peaks) + amps = np.zeros(num_peaks, dtype=simpl.dtype) + freqs = np.zeros(num_peaks, dtype=simpl.dtype) + phases = np.zeros(num_peaks, dtype=simpl.dtype) for i in range(num_peaks): peak = frame.peaks[i] amps[i] = peak.amplitude @@ -381,9 +387,9 @@ class SMSPartialTracking(simpl.PartialTracking): # SMS partial tracking simplsms.sms_findPartials(self._analysis_frame, self._analysis_params) # read values back into amps, freqs, phases - amps = simpl.zeros(self.max_partials) - freqs = simpl.zeros(self.max_partials) - phases = simpl.zeros(self.max_partials) + amps = np.zeros(self.max_partials, dtype=simpl.dtype) + freqs = np.zeros(self.max_partials, dtype=simpl.dtype) + phases = np.zeros(self.max_partials, dtype=simpl.dtype) self._analysis_frame.getSinAmp(amps) self._analysis_frame.getSinFreq(freqs) self._analysis_frame.getSinPhase(phases) @@ -395,7 +401,7 @@ class SMSPartialTracking(simpl.PartialTracking): p.phase = phases[i] peaks.append(p) return peaks - + def find_partials(self, frames): """Find partials from the sinusoidal peaks in a list of Frames""" self.frames = [] @@ -408,32 +414,34 @@ class SMSPartialTracking(simpl.PartialTracking): self.frames = self.frames[self.max_frame_delay:] return self.frames + class SMSSynthesis(simpl.Synthesis): "Sinusoidal resynthesis using SMS" - + def __init__(self): simpl.Synthesis.__init__(self) simplsms.sms_init() - self._synth_params = simplsms.SMS_SynthParams() + self._synth_params = simplsms.SMS_SynthParams() simplsms.sms_initSynthParams(self._synth_params) self._synth_params.iSamplingRate = self._sampling_rate self._synth_params.iDetSynthType = simplsms.SMS_DET_SIN self._synth_params.iSynthesisType = simplsms.SMS_STYPE_DET self._synth_params.iStochasticType = simplsms.SMS_STOC_NONE - self._synth_params.sizeHop = self._hop_size + self._synth_params.sizeHop = self._hop_size self._synth_params.nTracks = self._max_partials self._synth_params.deEmphasis = 0 simplsms.sms_initSynth(self._synth_params) - self._current_frame = simpl.zeros(self._hop_size) + self._current_frame = np.zeros(self._hop_size, dtype=simpl.dtype) self._analysis_frame = simplsms.SMS_Data() - simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, - self.num_stochastic_coeffs, 1, self.stochastic_type, 0) + simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, + self.num_stochastic_coeffs, 1, + self.stochastic_type, 0) def __del__(self): simplsms.sms_freeFrame(self._analysis_frame) simplsms.sms_freeSynth(self._synth_params) simplsms.sms_free() - + # properties synthesis_type = property(lambda self: self.get_synthesis_type(), lambda self, x: self.set_synthesis_type(x)) @@ -447,82 +455,85 @@ class SMSSynthesis(simpl.Synthesis): lambda self, x: self.set_original_sampling_rate(x)) original_hop_size = property(lambda self: self.get_original_hop_size(), lambda self, x: self.set_original_hop_size(x)) - + def get_hop_size(self): return self._synth_params.sizeHop - + def set_hop_size(self, hop_size): simplsms.sms_freeSynth(self._synth_params) self._synth_params.sizeHop = hop_size simplsms.sms_initSynth(self._synth_params) - self._current_frame = simpl.zeros(hop_size) - + self._current_frame = np.zeros(hop_size, dtype=simpl.dtype) + def get_max_partials(self): return self._synth_params.nTracks - + def set_max_partials(self, max_partials): simplsms.sms_freeSynth(self._synth_params) simplsms.sms_freeFrame(self._analysis_frame) self._synth_params.nTracks = max_partials simplsms.sms_initSynth(self._synth_params) - simplsms.sms_allocFrame(self._analysis_frame, max_partials, - self.num_stochastic_coeffs, 1, self.stochastic_type, 0) - + simplsms.sms_allocFrame(self._analysis_frame, max_partials, + self.num_stochastic_coeffs, 1, + self.stochastic_type, 0) + def get_sampling_rate(self): return self._synth_params.iSamplingRate - + def set_sampling_rate(self, sampling_rate): self._synth_params.iSamplingRate = sampling_rate - + def get_synthesis_type(self): return self._synth_params.iSynthesisType - + def set_synthesis_type(self, synthesis_type): self._synth_params.iSynthesisType = synthesis_type - + def get_det_synthesis_type(self): return self._synth_params.iDetSynthesisType - + def set_det_synthesis_type(self, det_synthesis_type): self._synth_params.iDetSynthType = det_synthesis_type def get_num_stochastic_coeffs(self): return self._synth_params.nStochasticCoeff - + def set_num_stochastic_coeffs(self, num_stochastic_coeffs): self._synth_params.nStochasticCoeff = num_stochastic_coeffs simplsms.sms_freeFrame(self._analysis_frame) - simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, - num_stochastic_coeffs, 1, self.stochastic_type, 0) - + simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, + num_stochastic_coeffs, 1, + self.stochastic_type, 0) + def get_stochastic_type(self): return self._synth_params.iStochasticType - + def set_stochastic_type(self, stochastic_type): simplsms.sms_freeSynth(self._synth_params) simplsms.sms_freeFrame(self._analysis_frame) self._synth_params.iStochasticType = stochastic_type simplsms.sms_initSynth(self._synth_params) - simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, - self.num_stochastic_coeffs, 1, stochastic_type, 0) - + simplsms.sms_allocFrame(self._analysis_frame, self.max_partials, + self.num_stochastic_coeffs, 1, + stochastic_type, 0) + def get_original_sampling_rate(self): return self._synth_params.iOriginalSRate - + def set_original_sampling_rate(self, sampling_rate): self._synth_params.iOriginalSRate = sampling_rate - + def get_original_hop_size(self): return self._synth_params.origSizeHop - + def set_original_hop_size(self, hop_size): self._synth_params.origSizeHop = hop_size def synth_frame(self, frame): "Synthesises a frame of audio" - amps = simpl.zeros(self.max_partials) - freqs = simpl.zeros(self.max_partials) - phases = simpl.zeros(self.max_partials) + amps = np.zeros(self.max_partials, dtype=simpl.dtype) + freqs = np.zeros(self.max_partials, dtype=simpl.dtype) + phases = np.zeros(self.max_partials, dtype=simpl.dtype) num_partials = min(self.max_partials, len(frame.partials)) for i in range(num_partials): amps[i] = frame.partials[i].amplitude @@ -531,13 +542,15 @@ class SMSSynthesis(simpl.Synthesis): self._analysis_frame.setSinAmp(amps) self._analysis_frame.setSinFreq(freqs) self._analysis_frame.setSinPha(phases) - simplsms.sms_synthesize(self._analysis_frame, self._current_frame, self._synth_params) + simplsms.sms_synthesize(self._analysis_frame, + self._current_frame, + self._synth_params) return self._current_frame - + class SMSResidual(simpl.Residual): "SMS residual component" - + def __init__(self): simpl.Residual.__init__(self) simplsms.sms_init() @@ -545,51 +558,44 @@ class SMSResidual(simpl.Residual): simplsms.sms_initResidualParams(self._residual_params) self._residual_params.hopSize = self._hop_size simplsms.sms_initResidual(self._residual_params) - + def __del__(self): simplsms.sms_freeResidual(self._residual_params) simplsms.sms_free() def get_hop_size(self): return self._residual_params.hopSize - + def set_hop_size(self, hop_size): simplsms.sms_freeResidual(self._residual_params) self._residual_params.hopSize = hop_size simplsms.sms_initResidual(self._residual_params) - + def residual_frame(self, synth, original): "Computes the residual signal for a frame of audio" simplsms.sms_findResidual(synth, original, self._residual_params) - residual = simpl.zeros(self._residual_params.hopSize) + residual = np.zeros(self._residual_params.hopSize, dtype=simpl.dtype) self._residual_params.getResidual(residual) return residual def find_residual(self, synth, original): "Calculate and return the residual signal" - import numpy as np - # pad the signals if necessary - if len(synth) % self.hop_size != 0: - synth = np.hstack((synth, np.zeros(self.hop_size - (len(synth) % self.hop_size)))) - if len(original) % self.hop_size != 0: - original = np.hstack((original, np.zeros(self.hop_size - (len(original) % self.hop_size)))) - num_frames = len(original) / self.hop_size - residual = simpl.array([]) + residual = np.array([], dtype=simpl.dtype) sample_offset = 0 for i in range(num_frames): - synth_frame = synth[sample_offset:sample_offset+self.hop_size] - original_frame = original[sample_offset:sample_offset+self.hop_size] - residual = np.hstack((residual, - self.residual_frame(synth_frame, original_frame))) + synth_frame = synth[sample_offset:sample_offset + self.hop_size] + original_frame = original[sample_offset:sample_offset + self.hop_size] + residual = np.hstack(( + residual, self.residual_frame(synth_frame, original_frame) + )) sample_offset += self.hop_size return residual def synth_frame(self, synth, original): "Calculate and return one frame of the synthesised residual signal" residual = self.residual_frame(synth, original) - approx = simpl.zeros(self._residual_params.hopSize) + approx = np.zeros(self._residual_params.hopSize, dtype=simpl.dtype) simplsms.sms_approxResidual(residual, approx, self._residual_params) return approx - diff --git a/simpl/sndobj.py b/simpl/sndobj.py index e64fcc3..90b8fee 100644 --- a/simpl/sndobj.py +++ b/simpl/sndobj.py @@ -1,62 +1,63 @@ import simpl -from simpl import simplsndobj +import simpl.simplsndobj as simplsndobj import numpy as np + class SndObjPeakDetection(simpl.PeakDetection): - "Sinusoidal peak detection using the SndObj library (Instantaneous Frequency)" + "Sinusoidal peak detection using the SndObj library" def __init__(self): simpl.PeakDetection.__init__(self) self._input = simplsndobj.SndObj() self._input.SetVectorSize(self.frame_size) self._window = simplsndobj.HammingTable(self.frame_size, 0.5) - self._ifgram = simplsndobj.IFGram(self._window, self._input, 1, + self._ifgram = simplsndobj.IFGram(self._window, self._input, 1, self.frame_size, self.hop_size) self._threshold = 0.003 self._analysis = simplsndobj.SinAnal(self._ifgram, self._threshold, self.max_peaks) - + # properties threshold = property(lambda self: self.get_threshold(), lambda self, x: self.set_threshold(x)) - + def set_frame_size(self, frame_size): "Set the analysis frame size" self._input.SetVectorSize(frame_size) if self.window_type == "hamming": self._window = simplsndobj.HammingTable(frame_size, 0.5) - elif self.window_type >=0 and self.window_type <= 1: + elif self.window_type >= 0 and self.window_type <= 1: self._window = simplsndobj.HammingTable(frame_size, self.window_type) - self._ifgram.Connect("window", self._window) + self._ifgram.Connect("window", self._window) self._ifgram.Set("fft size", frame_size) self._frame_size = frame_size - + def set_hop_size(self, hop_size): self._ifgram.Set("hop size", hop_size) self._hop_size = hop_size - + def set_max_peaks(self, max_peaks): "Set the maximum number of peaks detected" self._analysis.Set("max tracks", max_peaks) self._max_peaks = max_peaks - + def set_window_type(self, window_type): "Set the analysis window type" if window_type == "hamming": self._window = simplsndobj.HammingTable(self.frame_size, 0.5) - elif window_type >=0 and window_type <= 1: + elif window_type >= 0 and window_type <= 1: self._window = simplsndobj.HammingTable(self.frame_size, window_type) else: raise Exception("UnknownWindowType") - self._ifgram.Connect("window", self._window) - self._window_type = window_type - + self._ifgram.Connect("window", self._window) + self._window_type = window_type + def get_threshold(self): return self._threshold - + def set_threshold(self, threshold): self._analysis.Set("threshold", threshold) self._threshold = threshold - + def find_peaks_in_frame(self, frame): "Find and return all spectral peaks in a given frame of audio" peaks = [] @@ -64,12 +65,13 @@ class SndObjPeakDetection(simpl.PeakDetection): self._input.DoProcess() self._ifgram.DoProcess() num_peaks_found = self._analysis.FindPeaks() + # loop through analysis output and create peak objects for i in range(num_peaks_found): p = simpl.Peak() - p.amplitude = self._analysis.Output(i*3) - p.frequency = self._analysis.Output((i*3)+1) - p.phase = self._analysis.Output((i*3)+2) + p.amplitude = self._analysis.Output(i * 3) + p.frequency = self._analysis.Output((i * 3) + 1) + p.phase = self._analysis.Output((i * 3) + 2) if not peaks: peaks.append(p) else: @@ -80,29 +82,31 @@ class SndObjPeakDetection(simpl.PeakDetection): peaks.remove(peaks[-1]) peaks.append(p) return peaks - + class SndObjPartialTracking(simpl.PartialTracking): "Partial tracking using the algorithm from the Sound Object Library" def __init__(self): simpl.PartialTracking.__init__(self) - self._threshold = 0.003 # TODO: make this a property - self._num_bins = 1025 # TODO: make this a property - self._analysis = simplsndobj.SinAnal(simplsndobj.SndObj(), self._num_bins, - self._threshold, self.max_partials) - + self._threshold = 0.003 + self._num_bins = 1025 + self._analysis = simplsndobj.SinAnal( + simplsndobj.SndObj(), self._num_bins, + self._threshold, self.max_partials + ) + def set_max_partials(self, num_partials): self._analysis.Set("max tracks", num_partials) self._max_partials = num_partials - + def update_partials(self, frame): "Streamable (real-time) partial-tracking." partials = [] # load Peak amplitudes, frequencies and phases into arrays num_peaks = len(frame.peaks) - amps = simpl.zeros(num_peaks) - freqs = simpl.zeros(num_peaks) - phases = simpl.zeros(num_peaks) + amps = np.zeros(num_peaks, dtype=simpl.dtype) + freqs = np.zeros(num_peaks, dtype=simpl.dtype) + phases = np.zeros(num_peaks, dtype=simpl.dtype) for i in range(num_peaks): peak = frame.peaks[i] amps[i] = peak.amplitude @@ -116,23 +120,23 @@ class SndObjPartialTracking(simpl.PartialTracking): num_partials = self._analysis.GetTracks() for i in range(num_partials): peak = simpl.Peak() - peak.amplitude = self._analysis.Output(i*3) - peak.frequency = self._analysis.Output((i*3)+1) - peak.phase = self._analysis.Output((i*3)+2) + peak.amplitude = self._analysis.Output(i * 3) + peak.frequency = self._analysis.Output((i * 3) + 1) + peak.phase = self._analysis.Output((i * 3) + 2) partials.append(peak) for i in range(num_partials, self.max_partials): peak = simpl.Peak() partials.append(peak) return partials - - + + class SimplSndObjAnalysisWrapper(simplsndobj.SinAnal): - """An object that takes simpl Peaks and presents them as SndObj analysis + """An object that takes simpl Peaks and presents them as SndObj analysis data to the SndObj synthesis objects.""" def __init__(self): simplsndobj.SinAnal.__init__(self) self.partials = [] - + def GetTracks(self): return len(self.partials) @@ -142,7 +146,7 @@ class SimplSndObjAnalysisWrapper(simplsndobj.SinAnal): else: # TODO: what should this return if no matching partial found? return 0 - + def Output(self, position): peak = int(position) / 3 if peak > len(self.partials): @@ -156,8 +160,8 @@ class SimplSndObjAnalysisWrapper(simplsndobj.SinAnal): return self.partials[peak].frequency elif data_field is 2: return self.partials[peak].phase - - + + class SndObjSynthesis(simpl.Synthesis): "Sinusoidal resynthesis using the SndObj library" def __init__(self, synthesis_type='adsyn'): @@ -172,17 +176,17 @@ class SndObjSynthesis(simpl.Synthesis): self._table, 1, self.hop_size) else: raise Exception("UnknownSynthesisType") - self._current_frame = simpl.zeros(self.hop_size) - + self._current_frame = np.zeros(self.hop_size, dtype=np.dtype) + def set_hop_size(self, hop_size): self._synth.SetVectorSize(hop_size) self._hop_size = hop_size - self._current_frame = simpl.zeros(hop_size) - + self._current_frame = np.zeros(hop_size, dtype=np.dtype) + def set_max_partials(self, num_partials): self._synth.Set('max tracks', num_partials) self._max_partials = num_partials - + def synth_frame(self, frame): "Synthesises a frame of audio, given a list of peaks from tracks" self._analysis.partials = frame.partials @@ -191,4 +195,3 @@ class SndObjSynthesis(simpl.Synthesis): self._synth.DoProcess() self._synth.PopOut(self._current_frame) return self._current_frame - |