Skip to content

Commit c6a72e1

Browse files
committed
FIX #4 Allow for other valid header-names in the sph file
1 parent 8202268 commit c6a72e1

5 files changed

Lines changed: 188 additions & 57 deletions

File tree

pyproject.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[tool.black]
2+
skip-string-normalization=true
3+
exclude = '''
4+
(
5+
/(
6+
\.eggs|
7+
\.git|
8+
\.hg|
9+
\.mypy_cache|
10+
\.nox|
11+
\.tox|
12+
\.venv|
13+
_build|
14+
buck-out|
15+
build|
16+
dist|
17+
protos|
18+
migrations
19+
)/
20+
)
21+
'''

setup.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,12 @@
44
name="sphfile",
55
version="1.0.2",
66
url="https://github.com/mcfletch/sphfile",
7-
87
author="Mike C. Fletcher",
98
author_email="mcfletch@vrplumber.com",
10-
119
description="Numpy-based NIST SPH audio-file reader",
1210
long_description=open('README.rst').read(),
13-
1411
packages=setuptools.find_packages(),
15-
1612
install_requires=[],
17-
1813
classifiers=[
1914
'Programming Language :: Python',
2015
'Programming Language :: Python :: 2',

sphfile/sphfile.py

Lines changed: 94 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
33
Uses the standard-library's wave module to write the wav files
44
"""
5-
import numpy
5+
import numpy, re
6+
NAME_MATCH = re.compile(r'^[a-zA-Z][a-zA-Z0-9]*([_][a-zA-Z][a-zA-Z0-9]*)*$')
67

7-
def parse_sph_header( fh ):
8+
def parse_sph_header(fh):
89
"""Read the file-format header for an sph file
910
1011
The SPH header-file is exactly 1024 bytes at the head of the file,
@@ -25,34 +26,43 @@ def parse_sph_header( fh ):
2526
}
2627
"""
2728
file_format = {
28-
'sample_rate':8000,
29-
'channel_count':1,
30-
'sample_byte_format': '01', # little-endian
31-
'sample_n_bytes':2,
32-
'sample_sig_bits': 16,
33-
'sample_coding': 'pcm',
29+
'sample_rate': 8000,
30+
'channel_count': 1,
31+
'sample_byte_format': '01', # little-endian
32+
'sample_n_bytes': 2,
33+
'sample_sig_bits': 16,
34+
'sample_coding': 'pcm',
3435
}
3536
end = b'end_head'
3637
for line in fh.read(1024).splitlines():
3738
if line.startswith(end):
38-
break
39+
break
3940
line = line.decode('latin-1')
40-
for key in file_format.keys():
41-
if line.startswith(key):
42-
_, format, value = line.split(None, 3)
43-
if format == '-i':
44-
value = int(value, 10)
45-
file_format[key] = value
41+
try:
42+
key, format, value = line.split(None, 3)
43+
except (ValueError, KeyError, TypeError) as err:
44+
pass
45+
else:
46+
key, format, value = line.split(None, 3)
47+
if not NAME_MATCH.match(key):
48+
# we'll ignore invalid names for now...
49+
continue
50+
if format == '-i':
51+
value = int(value, 10)
52+
file_format[key] = value
4653
return file_format
4754

48-
class SPHFile( object ):
55+
56+
class SPHFile(object):
4957
"""SPH data-file that can is read into RAM on access"""
50-
def __init__( self, filename ):
51-
self.filename = filename
58+
59+
def __init__(self, filename):
60+
self.filename = filename
5261
self._rawbytes = None
53-
def open( self ):
54-
with open( self.filename, 'rb' ) as fh:
55-
self._format = format = parse_sph_header( fh )
62+
63+
def open(self):
64+
with open(self.filename, 'rb') as fh:
65+
self._format = format = parse_sph_header(fh)
5666
content = fh.read()
5767
if format['sample_n_bytes'] == 1:
5868
np_format = numpy.uint8
@@ -61,62 +71,94 @@ def open( self ):
6171
elif format['sample_n_bytes'] == 4:
6272
np_format = numpy.int32
6373
else:
64-
raise RuntimeError( "Unrecognized byte count: %s", format['sample_n_bytes'] )
65-
remainder = len(content)%format['sample_n_bytes']
74+
raise RuntimeError(
75+
"Unrecognized byte count: %s", format['sample_n_bytes']
76+
)
77+
remainder = len(content) % format['sample_n_bytes']
6678
if remainder:
6779
content = content[:-remainder]
6880
self._rawbytes = content
69-
self._content = numpy.fromstring(content,dtype=np_format)
81+
self._content = numpy.frombuffer(content, dtype=np_format)
7082
if self._format['sample_byte_format'] == '10':
7183
# deal with big-endian data-files as wav is going to expect little-endian
7284
self._content = self._content.byteswap()
73-
85+
7486
_format = _content = None
87+
7588
@property
76-
def format( self ):
89+
def format(self):
7790
if self._format is None:
78-
with open( self.filename, 'rb' ) as fh:
79-
self._format = parse_sph_header( fh )
80-
return self._format
91+
with open(self.filename, 'rb') as fh:
92+
self._format = parse_sph_header(fh)
93+
return self._format
94+
8195
@property
82-
def content( self ):
96+
def content(self):
8397
if self._content is None:
8498
self.open()
85-
return self._content
86-
99+
return self._content
100+
87101
def seconds_to_offset(self, seconds):
88102
"""Calculate buffer offset in seconds (assumes interleaved channels)"""
89103
return int(seconds * self.format['sample_rate'] * self.format['channel_count'])
90-
def time_range(self, start=0, stop=None ):
104+
105+
def time_range(self, start=0, stop=None):
91106
if stop is not None:
92-
return self.content[ self.seconds_to_offset(start):self.seconds_to_offset(stop) ]
107+
return self.content[
108+
self.seconds_to_offset(start) : self.seconds_to_offset(stop)
109+
]
93110
else:
94-
return self.content[ self.seconds_to_offset(start): ]
95-
96-
def write_wav( self, filename, start=None, stop=None ):
111+
return self.content[self.seconds_to_offset(start) :]
112+
113+
def write_wav(self, filename, start=None, stop=None):
97114
"""Write our audio buffer to given filename as a wave-file"""
98115
import wave
99-
with wave.open(filename,'wb') as fh:
116+
117+
with wave.open(filename, 'wb') as fh:
100118
params = (
101-
self.format['channel_count'],
102-
self.format['sample_n_bytes'],
119+
self.format['channel_count'],
120+
self.format['sample_n_bytes'],
103121
self.format['sample_rate'],
104-
0,
105-
'NONE', 'NONE'
122+
0,
123+
'NONE',
124+
'NONE',
106125
)
107126
fh.setparams(params)
108127
if start is not None or stop is not None:
109-
data = self.time_range( start, stop )
128+
data = self.time_range(start, stop)
110129
else:
111130
data = self.content
112-
fh.writeframes( data.tostring() )
131+
fh.writeframes(data.tostring())
113132
return filename
114-
115-
def test():
116-
sph =SPHFile( '/var/datasets/TEDLIUM_release2/test/sph/JamesCameron_2010.sph' )
117-
sph.write_wav( 'test.wav', 111.29, 123.57 )
118-
print("test.wav should say: i had to create these images in my head you know we all did as kids having to read a book and through the author 's description put something on on the screen the movie screen in our heads and so my")
119133

120-
if __name__ == "__main__":
121-
test()
122-
134+
def write_sph(self, filename, start=None, stop=None,extra_headers=None):
135+
"""Write out an SPH data-file with (a subset) of our data"""
136+
headers = numpy.zeros(1024,dtype='c')
137+
header_set = [
138+
'NIST_1A',
139+
' 1024',
140+
]
141+
if start is not None or stop is not None:
142+
data = self.time_range(start, stop)
143+
else:
144+
data = self.content
145+
params = self.format.copy()
146+
params['sample_count'] = len(data)
147+
if extra_headers:
148+
params.update(extra_headers)
149+
for key,value in sorted(params.items()):
150+
if isinstance(value,int):
151+
typ = '-i'
152+
else:
153+
typ = '-s%s'%(len(value),)
154+
header_set.append('%s %s %s'%(
155+
key,typ,value,
156+
))
157+
header_set.append('end_head')
158+
header_set.append('')
159+
headers_encoded = ('\n'.join(header_set)).encode('ascii',errors='ignore')
160+
headers[len(headers_encoded):] = headers_encoded
161+
with open(filename,'wb') as fh:
162+
fh.write(headers)
163+
fh.write(data)
164+

test-requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pytest
2+
black
3+
-r requirements.txt
4+

tests/test_sphfile.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import unittest, pytest, os, glob
2+
from sphfile import sphfile
3+
import io
4+
5+
HERE = os.path.dirname(os.path.abspath(__file__))
6+
CAMERON_SAMPLE = os.path.join(HERE, 'JamesCameron_2010.sph')
7+
8+
9+
class TestSPHFile(unittest.TestCase):
10+
11+
pytest.mark.skipif(
12+
not os.path.exists(CAMERON_SAMPLE),
13+
reason='Missing James Cameron Sample from TEDLium',
14+
)
15+
def test_extract(self):
16+
sph = sphfile.SPHFile(CAMERON_SAMPLE)
17+
target = CAMERON_SAMPLE[:-4]+'-extract.wav'
18+
sph.write_wav(target, 111.29, 123.57)
19+
print(
20+
"%s should say:\ni had to create these images in my head you know we all did as kids having to read a book and through the author 's description put something on on the screen the movie screen in our heads and so my"%(
21+
target,
22+
)
23+
)
24+
assert os.stat(target).st_size == 393004
25+
26+
def test_format(self):
27+
for filename in glob.glob(os.path.join(HERE, '*.sph'),):
28+
sph = sphfile.SPHFile(filename)
29+
assert sph.format
30+
for key in [
31+
'sample_rate',
32+
'channel_count',
33+
'sample_byte_format', # little-endian
34+
'sample_n_bytes',
35+
'sample_sig_bits',
36+
'sample_coding',
37+
]:
38+
assert key in sph.format, sph.format
39+
40+
def test_parse_bad_names(self):
41+
"""Test header parsing ignores invalid type names"""
42+
for header,expected in [
43+
(
44+
'''NIST_1A
45+
1024
46+
sample_count -i 16892238
47+
sample_n_bytes -i 2
48+
channel_count -i 1
49+
_this_and_that -s4 that
50+
this__and -i 3
51+
this__ -i 3
52+
sample_byte_format -s2 10
53+
sample_rate -i 16000
54+
sample_coding -s3 pcm
55+
end_head
56+
''',
57+
{
58+
'sample_count':16892238,
59+
'sample_n_bytes':2,
60+
'sample_sig_bits': 16,
61+
'channel_count': 1,
62+
'sample_byte_format': '10',
63+
'sample_rate': 16000,
64+
'sample_coding': 'pcm',
65+
}
66+
)
67+
]:
68+
format = sphfile.parse_sph_header(io.BytesIO(header.encode('ascii')))
69+
assert format == expected, (header,format)

0 commit comments

Comments
 (0)