Python audioop 模块,rms() 实例源码
我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用audioop.rms()。
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def main():
audio = Audio(frames_size=1600)
rms = RMS()
audio.link(rms)
audio.start()
alarm = os.path.realpath(os.path.join(os.path.dirname(__file__), 'resources/alarm.mp3'))
alarm_uri = 'file://{}'.format(alarm)
player1 = Player()
player2 = Player()
while True:
try:
player1.play(alarm_uri)
time.sleep(1)
player2.play(alarm_uri)
time.sleep(3)
except KeyboardInterrupt:
break
audio.stop()
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def detect_squelch_level(self, detect_time=10, threshold=.8):
start_time = time.time()
end_time = start_time + detect_time
audio_chunks = collections.deque()
async with self._source.listen():
async for block in self._source:
if time.time() > end_time:
break
even_iter = EvenChunkIterator(block, self._sample_size)
try:
while time.time() < end_time:
audio_chunks.append(await even_iter.__anext__())
except StopAsyncIteration:
pass
rms_vals = [audioop.rms(x.audio, self._sample_width) for x in
audio_chunks
if len(x.audio) == self._sample_size * self._sample_width]
level = sorted(rms_vals)[int(threshold * len(rms_vals)):][0]
self.squelch_level = level
return level
def read(self, frame_size):
self.frame_count += 1
frame = self.buff.read(frame_size)
if self.volume != 1:
frame = self._frame_vol(frame, self.volume, maxv=2)
if self.draw and not self.frame_count % self.frame_skip:
# these should be processed for every frame, but "overhead"
rms = audioop.rms(frame, 2)
self.rmss.append(rms)
max_rms = sorted(self.rmss)[-1]
meter_text = 'avg rms: {:.2f}, max rms: {:.2f} '.format(self._avg(self.rmss), max_rms)
self._pprint_meter(rms / max(1, max_rms), text=meter_text, shift=True)
return frame
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_string(self):
data = 'abcd'
size = 2
self.assertRaises(TypeError, audioop.getsample, data, size, 0)
self.assertRaises(TypeError, audioop.max, data, size)
self.assertRaises(TypeError, audioop.minmax, data, size)
self.assertRaises(TypeError, audioop.avg, data, size)
self.assertRaises(TypeError, audioop.rms, data, size)
self.assertRaises(TypeError, audioop.avgpp, data, size)
self.assertRaises(TypeError, audioop.maxpp, data, size)
self.assertRaises(TypeError, audioop.cross, data, size)
self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.add, data, data, size)
self.assertRaises(TypeError, audioop.bias, data, size, 0)
self.assertRaises(TypeError, audioop.reverse, data, size)
self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
self.assertRaises(TypeError, audioop.lin2alaw, data, size)
self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_string(self):
data = 'abcd'
size = 2
self.assertRaises(TypeError, audioop.getsample, data, size, 0)
self.assertRaises(TypeError, audioop.max, data, size)
self.assertRaises(TypeError, audioop.minmax, data, size)
self.assertRaises(TypeError, audioop.avg, data, size)
self.assertRaises(TypeError, audioop.rms, data, size)
self.assertRaises(TypeError, audioop.avgpp, data, size)
self.assertRaises(TypeError, audioop.maxpp, data, size)
self.assertRaises(TypeError, audioop.cross, data, size)
self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.add, data, data, size)
self.assertRaises(TypeError, audioop.bias, data, size, 0)
self.assertRaises(TypeError, audioop.reverse, data, size)
self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
self.assertRaises(TypeError, audioop.lin2alaw, data, size)
self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def __db_level(self, rms_mode=False):
"""
Returns the average audio volume level measured in dB (range -60 db to 0 db)
If the sample is stereo, you get back a tuple: (left_level, right_level)
If the sample is mono, you still get a tuple but both values will be the same.
This method is probably only useful if processed on very short sample fragments in sequence,
so the db levels could be used to show a level meter for the duration of the sample.
"""
maxvalue = 2**(8*self.__samplewidth-1)
if self.nchannels == 1:
if rms_mode:
peak_left = peak_right = (audioop.rms(self.__frames, self.__samplewidth)+1)/maxvalue
else:
peak_left = peak_right = (audioop.max(self.__frames, self.__samplewidth)+1)/maxvalue
else:
left_frames = audioop.tomono(self.__frames, self.__samplewidth, 1, 0)
right_frames = audioop.tomono(self.__frames, self.__samplewidth, 0, 1)
if rms_mode:
peak_left = (audioop.rms(left_frames, self.__samplewidth)+1)/maxvalue
peak_right = (audioop.rms(right_frames, self.__samplewidth)+1)/maxvalue
else:
peak_left = (audioop.max(left_frames, self.__samplewidth)+1)/maxvalue
peak_right = (audioop.max(right_frames, self.__samplewidth)+1)/maxvalue
# cut off at the bottom at -60 instead of all the way down to -infinity
return max(20.0*math.log(peak_left, 10), -60.0), max(20.0*math.log(peak_right, 10), -60.0)
def adjust_for_ambient_noise(self, source, duration = 1):
"""
Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise.
Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected.
The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise.
"""
assert isinstance(source, AudioSource), "Source must be an audio source"
assert self.pause_threshold >= self.non_speaking_duration >= 0
seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE
elapsed_time = 0
# adjust energy threshold until a phrase starts
while True:
elapsed_time += seconds_per_buffer
if elapsed_time > duration: break
buffer = source.stream.read(source.CHUNK, exception_on_overflow=False)
energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal
# dynamically adjust the energy threshold using assymmetric weighted average
damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates
target_energy = energy * self.dynamic_energy_ratio
self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
def adjust_for_ambient_noise(self, source, duration = 1):
"""
Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise.
Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected.
The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise.
"""
assert isinstance(source, AudioSource), "Source must be an audio source"
assert source.stream is not None, "Audio source must be entered before adjusting, see documentation for `AudioSource`; are you using `source` outside of a `with` statement?"
assert self.pause_threshold >= self.non_speaking_duration >= 0
seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE
elapsed_time = 0
# adjust energy threshold until a phrase starts
while True:
elapsed_time += seconds_per_buffer
if elapsed_time > duration: break
buffer = source.stream.read(source.CHUNK)
energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal
# dynamically adjust the energy threshold using assymmetric weighted average
damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates
target_energy = energy * self.dynamic_energy_ratio
self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
def _calc_energy(self, sound_chunk):
"""Calculates how loud the sound is"""
return audioop.rms(sound_chunk, self.sample_width)
def getScore(self, data):
rms = audioop.rms(data, 2)
score = rms / 3
return score
def test_rms(self):
self.assertEqual(audioop.rms(data[0], 1), 1)
self.assertEqual(audioop.rms(data[1], 2), 1)
self.assertEqual(audioop.rms(data[2], 4), 1)
def put(self, data):
print('RMS: {}'.format(audioop.rms(data, 2)))
def test_rms(self):
for w in 1, 2, 4:
self.assertEqual(audioop.rms(b'', w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def test_rms(self):
for w in 1, 2, 4:
self.assertEqual(audioop.rms(b'', w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def get_score(self, data):
rms = audioop.rms(data, 2)
score = rms / 3
return score
def test_rms(self):
for w in 1, 2, 4:
self.assertEqual(audioop.rms(b'', w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def check_squelch(level, is_triggered, chunks):
rms_vals = [audioop.rms(x.audio, x.width) for x in chunks]
median_rms = sorted(rms_vals)[int(len(rms_vals) * .5)]
if is_triggered:
if median_rms < (level * .8):
return False
else:
return True
else:
if median_rms > level:
return True
else:
return False
def test_rms(self):
for w in 1, 2, 4:
self.assertEqual(audioop.rms(b'', w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def test_rms(self):
for w in 1, 2, 3, 4:
self.assertEqual(audioop.rms(b'', w), 0)
self.assertEqual(audioop.rms(bytearray(), w), 0)
self.assertEqual(audioop.rms(memoryview(b''), w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[3], 3), 5120523)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def _snr(self, frames):
rms = audioop.rms(b''.join(frames), int(self._input_bits / 8))
if rms > 0 and self._threshold > 0:
return 20.0 * math.log(rms / self._threshold, 10)
else:
return 0
def test_rms(self):
for w in 1, 2, 4:
self.assertEqual(audioop.rms(b'', w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def test_rms(self):
for w in 1, 2, 3, 4:
self.assertEqual(audioop.rms(b'', w), 0)
self.assertEqual(audioop.rms(bytearray(), w), 0)
self.assertEqual(audioop.rms(memoryview(b''), w), 0)
p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[3], 3), 5120523)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def rms(self):
return audioop.rms(self.__frames, self.samplewidth)
def _apiai_stt(self):
from math import log
import audioop
import pyaudio
import time
resampler = apiai.Resampler(source_samplerate=settings['RATE'])
request = self.ai.voice_request()
vad = apiai.VAD()
def callback(in_data, frame_count):
frames, data = resampler.resample(in_data, frame_count)
if settings.show_decibels:
decibel = 20 * log(audioop.rms(data, 2) + 1, 10)
click.echo(decibel)
state = vad.processFrame(frames)
request.send(data)
state_signal = pyaudio.paContinue if state == 1 else pyaudio.paComplete
return in_data, state_signal
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt32, input=True, output=False, stream_callback=callback,
channels=settings['CHANNELS'], rate=settings['RATE'], frames_per_buffer=settings['CHUNK'])
stream.start_stream()
click.echo("Speak!")
while stream.is_active():
time.sleep(0.1)
stream.stop_stream()
stream.close()
p.terminate()
def check_silence(self, buf):
volume = audioop.rms(buf, 2)
if (volume >= config.THRESHOLD):
self.silence_timer = time.time()
if (self.append == False):
if (self.hatch.get('debug') == True):
print ('starting append mode')
self.timer = time.time()
for sbuf in self.silence_buffer:
self.prepare.prepare(sbuf, volume)
self.silence_buffer = [ ]
self.append = True
self.silence_counter = 0
else:
self.silence_counter += 1
self.silence_buffer.append(buf)
if (len(self.silence_buffer) > 3):
del self.silence_buffer[0]
if (self.out != None and self.out.closed != True):
self.out.write(buf)
if (self.append == True):
self.prepare.prepare(buf, volume)
if (self.append == True and self.silence_timer > 0
and self.silence_timer + config.MAX_SILENCE_AFTER_START < time.time()
and self.live == True):
self.stop("stop append mode because of silence")
if (self.append == True and self.timer + config.MAX_TIME < time.time()
and self.live == True):
self.stop("stop append mode because time is up")
def rms(audio_fragment, sample_width_in_bytes=2):
"""Given audio fragment, return the root mean square."""
# Equal to
# return int(np.sqrt(np.mean(np.square(
# np.absolute(audio_fragment.astype(np.float))))))
return audioop.rms(np.abs(audio_fragment), sample_width_in_bytes)
def record(self):
'''
time.sleep(1)
args = ['arecord', '-d', '5', self.voicefile]
proc = subprocess.Popen(args)
time.sleep(5)
'''
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
CHUNK = 1024
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = self.voicefile
audio = pyaudio.PyAudio()
# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK)
print "recording..."
frames = []
threshold = 1000
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
rms = audioop.rms(data,2)
if rms > threshold:
print "I am hearing you now"
frames.append(data)
print "finished recording"
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()
waveFile = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
waveFile.setnchannels(CHANNELS)
waveFile.setsampwidth(audio.get_sample_size(FORMAT))
waveFile.setframerate(RATE)
waveFile.writeframes(b''.join(frames))
waveFile.close()
def record(self):
'''
time.sleep(1)
args = ['arecord', '-d', '5', self.voicefile]
proc = subprocess.Popen(args)
time.sleep(5)
'''
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
CHUNK = 1024
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = self.voicefile
audio = pyaudio.PyAudio()
# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK)
print "recording..."
frames = []
threshold = 1000
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
rms = audioop.rms(data,2)
if rms > threshold:
print "I am hearing you now"
frames.append(data)
print "finished recording"
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()
waveFile = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
waveFile.setnchannels(CHANNELS)
waveFile.setsampwidth(audio.get_sample_size(FORMAT))
waveFile.setframerate(RATE)
waveFile.writeframes(b''.join(frames))
waveFile.close()
def wait_for_keyword(self, keyword=None):
if not keyword:
keyword = self._keyword
frame_queue = queue.Queue()
keyword_uttered = threading.Event()
# FIXME: not configurable yet
num_worker_threads = 2
for i in range(num_worker_threads):
t = threading.Thread(target=self.check_for_keyword,
args=(frame_queue, keyword_uttered, keyword))
t.daemon = True
t.start()
frames = collections.deque([], 30)
recording = False
recording_frames = []
self._logger.info("Waiting for keyword '%s'...", keyword)
for frame in self._input_device.record(self._input_chunksize,
self._input_bits,
self._input_channels,
self._input_rate):
if keyword_uttered.is_set():
self._logger.info("Keyword %s has been uttered", keyword)
return
frames.append(frame)
if not recording:
snr = self._snr([frame])
if snr >= 10: # 10dB
# Loudness is higher than normal, start recording and use
# the last 10 frames to start
self._logger.debug("Started recording on device '%s'",
self._input_device.slug)
self._logger.debug("Triggered on SNR of %sdB", snr)
recording = True
recording_frames = list(frames)[-10:]
elif len(frames) >= frames.maxlen:
# Threshold SNR not reached. Update threshold with
# background noise.
self._threshold = float(audioop.rms("".join(frames), 2))
else:
# We're recording
recording_frames.append(frame)
if len(recording_frames) > 20:
# If we recorded at least 20 frames, check if we're below
# threshold again
last_snr = self._snr(recording_frames[-10:])
self._logger.debug(
"Recording's SNR dB: %f", last_snr)
if last_snr <= 3 or len(recording_frames) >= 60:
# The loudness of the sound is not at least as high as
# the the threshold, or we've been waiting too long
# we'll stop recording now
recording = False
self._logger.debug("Recorded %d frames",
len(recording_frames))
frame_queue.put(tuple(recording_frames))
self._threshold = float(
audioop.rms(b"".join(frames), 2))