Skip to content
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ ipython_config.py
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.conda
bootstrap_requirements.txt
environment.yml

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
Expand Down
417 changes: 417 additions & 0 deletions docs/examples/Amplitude_LUT_Example.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/examples/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ Broadbean Examples
Making_output_for_Tektronix_AWG70000A.ipynb
Example_Write_Read_JSON.ipynb
Filter_compensation.ipynb
Amplitude_LUT_Example.ipynb
Subsequences.ipynb
18 changes: 18 additions & 0 deletions src/broadbean/ripasso.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,21 @@ def applyCustomTransferFunction(signal, SR, tf_freqs, tf_amp, invert=False):
signal_filtered = np.real(signal_filtered)

return signal_filtered


def applyAmplitudeLUT(signal, lut):
Comment thread
bennthomsen marked this conversation as resolved.
Outdated
"""
Apply an amplitude LUT to the signal.

Args:
signal (np.array): The input signal. The signal is assumed to have
values in the range [-1, 1].
lut (np.array): The amplitude LUT. Should be a 1D array of length N,
Comment thread
bennthomsen marked this conversation as resolved.
Outdated
where N is the number of entries in the LUT.

Returns:
np.array:
The signal after applying the amplitude LUT.
"""

return np.interp(signal, lut["LUT_input"], lut["LUT_output"])
50 changes: 48 additions & 2 deletions src/broadbean/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from broadbean.blueprint import BluePrint
from broadbean.element import Element # TODO: change import to element.py
from broadbean.ripasso import applyInverseRCFilter
from broadbean.ripasso import applyAmplitudeLUT, applyInverseRCFilter

from .broadbean import (
PulseAtoms,
Expand Down Expand Up @@ -385,6 +385,30 @@ def setChannelFilterCompensation(
"tau": tau,
}

def setAmplitudeLUT(
self, channel: int | str, lut_input: list[float], lut_output: list[float]
) -> None:
"""
Set an amplitude lookup table for a channel. This is used when making
output for .awg files. The LUT is applied after all other waveform
processing.

Args:
channel: The channel number/name
lut_input: The input levels for the LUT
lut_output: The output levels for the LUT

Example:
>>> lut_input = [-1.0, -0.5, 0.0, 0.5, 1.0]
>>> lut_output = [-0.8, -0.3, 0.0, 0.3, 0.8]
>>> seq.setAmplitudeLUT(1, lut_input, lut_output)
"""

self._awgspecs[f"channel{channel}_amplitude_LUT"] = {
"LUT_input": lut_input,
"LUT_output": lut_output,
}

def addElement(self, position: int, element: Element) -> None:
"""
Add an element to the sequence. Overwrites previous values.
Expand Down Expand Up @@ -841,6 +865,19 @@ def forge(
output[pos1]["content"][pos2]["data"][channame]["wfm"]
) = postfilter

# Apply amplitude LUT if present
lut_key = f"channel{channame}_amplitude_LUT"
if lut_key in self._awgspecs.keys():
lut = self._awgspecs[lut_key]
output[pos1]["content"][pos2]["data"][channame]["wfm"] = (
applyAmplitudeLUT(
output[pos1]["content"][pos2]["data"][channame][
"wfm"
],
lut,
)
)

return output

def _prepareForOutputting(self) -> list[dict[int, Any]]:
Expand Down Expand Up @@ -954,6 +991,15 @@ def _prepareForOutputting(self) -> list[dict[int, Any]]:
)
elements[pos][chan]["wfm"] = postfilter

# Apply amplitude LUT if present
lut_key = f"channel{chan}_amplitude_LUT"
if lut_key in self._awgspecs.keys():
lut = self._awgspecs[lut_key]
for pos in range(seqlen):
elements[pos][chan]["wfm"] = applyAmplitudeLUT(
elements[pos][chan]["wfm"], lut
)

return elements

def outputForSEQXFile(
Expand Down Expand Up @@ -1157,7 +1203,7 @@ def outputForSEQXFileWithFlags(
flags_pos = []
for pos in range(1, seqlen + 1):
if "flags" in elements[pos - 1][chan]:
flags = elements[pos - 1][chan]["flags"].tolist()
flags = elements[pos - 1][chan]["flags"]
Comment thread
bennthomsen marked this conversation as resolved.
Outdated
else:
flags = [0, 0, 0, 0]
flags_pos.append(flags)
Expand Down
239 changes: 239 additions & 0 deletions tests/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,242 @@ def test_write_read_sequence(protosequence1, protosequence2, tmp_path):
seq.write_to_json(os.path.join(d, "Seq.json"))
readbackseq = Sequence.init_from_json(os.path.join(d, "Seq.json"))
assert seq == readbackseq


##################################################
# Amplitude LUT Tests


def test_setAmplitudeLUT():
"""Test the setAmplitudeLUT method and its application in _prepareForOutputting"""

# Create a simple sequence with a known waveform
SR = 1e6

# Create a simple ramp blueprint
bp = bb.BluePrint()
bp.insertSegment(0, ramp, args=(-0.5, 0.5), name="test_ramp", dur=10e-6)
bp.setSR(SR)

# Create element and sequence
elem = bb.Element()
elem.addBluePrint(1, bp)

seq = Sequence()
seq.addElement(1, elem)
seq.setSR(SR)

# Set required channel parameters for _prepareForOutputting
seq.setChannelAmplitude(1, 2.0) # 2V peak-to-peak
seq.setChannelOffset(1, 0.0) # 0V offset

# Test that LUT is properly stored
lut_input = [-1.0, -0.5, 0.0, 0.5, 1.0]
lut_output = [-0.8, -0.3, 0.0, 0.4, 0.9] # Non-linear mapping

seq.setAmplitudeLUT(1, lut_input, lut_output)

# Verify the LUT is stored correctly
expected_lut = {"LUT_input": lut_input, "LUT_output": lut_output}
assert seq._awgspecs["channel1_amplitude_LUT"] == expected_lut

# Test _prepareForOutputting applies the LUT
elements = seq._prepareForOutputting()

# Get the original waveform without LUT for comparison
seq_no_lut = Sequence()
seq_no_lut.addElement(1, elem)
seq_no_lut.setSR(SR)
seq_no_lut.setChannelAmplitude(1, 2.0)
seq_no_lut.setChannelOffset(1, 0.0)

elements_no_lut = seq_no_lut._prepareForOutputting()

# Verify that the LUT was applied
original_wfm = elements_no_lut[0][1]["wfm"]
lut_applied_wfm = elements[0][1]["wfm"]

# The waveforms should be different due to LUT application
assert not np.array_equal(original_wfm, lut_applied_wfm)

# Verify the LUT transformation is correct by checking specific points
# For a ramp from -0.5 to 0.5, we can verify the interpolation
expected_transformed = np.interp(original_wfm, lut_input, lut_output)
assert np.allclose(lut_applied_wfm, expected_transformed)

# Test edge cases: verify LUT works for boundary values
# The ramp goes from -0.5 to 0.4 (10 points, endpoint not included)
# so we should see the corresponding LUT output values at the boundaries
min_val = np.min(lut_applied_wfm)
max_val = np.max(lut_applied_wfm)

# Check that the min/max values are approximately what we expect from LUT
# Original ramp: -0.5 to 0.4 -> LUT maps -0.5 to -0.3, and 0.4 to 0.32
assert np.isclose(min_val, -0.3, rtol=1e-10)
assert np.isclose(max_val, 0.32, rtol=1e-10)

# Test the forge() method with amplitude LUT
forged_output = seq.forge()

# Verify structure of forged output
assert 1 in forged_output # Position 1 exists
assert forged_output[1]["type"] == "element"
assert "content" in forged_output[1]
assert 1 in forged_output[1]["content"] # Sub-position 1 exists
assert "data" in forged_output[1]["content"][1]
assert 1 in forged_output[1]["content"][1]["data"] # Channel 1 exists
assert "wfm" in forged_output[1]["content"][1]["data"][1]

# Extract the LUT-applied waveform from forge() output
forged_wfm = forged_output[1]["content"][1]["data"][1]["wfm"]

# The forged waveform should be identical to the _prepareForOutputting result
assert np.allclose(forged_wfm, lut_applied_wfm)

# Verify the LUT was applied correctly in forge() as well
expected_forged = np.interp(original_wfm, lut_input, lut_output)
assert np.allclose(forged_wfm, expected_forged)


def test_setAmplitudeLUT_multiple_channels():
"""Test that amplitude LUT works correctly with multiple channels"""

SR = 1e6

# Create blueprints for two channels
bp1 = bb.BluePrint()
bp1.insertSegment(0, ramp, args=(-1.0, 1.0), name="ramp_channel_a", dur=5e-6)
bp1.setSR(SR)

bp2 = bb.BluePrint()
bp2.insertSegment(0, sine, args=(1e5, 0.5, 0, 0), name="sine_channel_b", dur=5e-6)
bp2.setSR(SR)

# Create element and sequence
elem = bb.Element()
elem.addBluePrint(1, bp1)
elem.addBluePrint(2, bp2)

seq = Sequence()
seq.addElement(1, elem)
seq.setSR(SR)

# Set channel parameters
seq.setChannelAmplitude(1, 2.0)
seq.setChannelOffset(1, 0.0)
seq.setChannelAmplitude(2, 1.0)
seq.setChannelOffset(2, 0.0)

# Set different LUTs for each channel
lut1_input = [-1.0, 0.0, 1.0]
lut1_output = [-0.5, 0.0, 0.8] # Asymmetric compression

lut2_input = [-1.0, 0.0, 1.0]
lut2_output = [-1.2, 0.0, 1.2] # Expansion

seq.setAmplitudeLUT(1, lut1_input, lut1_output)
seq.setAmplitudeLUT(2, lut2_input, lut2_output)

# Test _prepareForOutputting
elements = seq._prepareForOutputting()

# Verify both LUTs are applied correctly
wfm1 = elements[0][1]["wfm"]
wfm2 = elements[0][2]["wfm"]

# Channel 1 should have compressed range
assert np.max(wfm1) <= 0.8
assert np.min(wfm1) >= -0.5

# Channel 2 should have expanded range (beyond original sine amplitude)
# The original sine only goes from 0 to ~0.48, so with expansion LUT it should exceed that
original_max = 0.48 # Approximate max from our sine wave
assert np.max(wfm2) > original_max # Should be expanded

# Verify LUT is actually applied by checking that values are different from input range
# The LUT should map values according to the interpolation table
assert (
np.min(wfm2) >= 0.0
) # Should still be non-negative since original sine is non-negative

# Test the forge() method with multiple channel LUTs
forged_output = seq.forge()

# Verify structure of forged output for multiple channels
assert 1 in forged_output # Position 1 exists
assert forged_output[1]["type"] == "element"
forged_data = forged_output[1]["content"][1]["data"]
assert 1 in forged_data # Channel 1 exists
assert 2 in forged_data # Channel 2 exists

# Extract the LUT-applied waveforms from forge() output
forged_wfm1 = forged_data[1]["wfm"]
forged_wfm2 = forged_data[2]["wfm"]

# The forged waveforms should be identical to the _prepareForOutputting results
assert np.allclose(forged_wfm1, wfm1)
assert np.allclose(forged_wfm2, wfm2)

# Verify the LUTs were applied correctly in forge() as well
# Channel 1 should have compressed range
assert np.max(forged_wfm1) <= 0.8
assert np.min(forged_wfm1) >= -0.5

# Channel 2 should have expanded range
original_max = 0.48 # Approximate max from our sine wave
assert np.max(forged_wfm2) > original_max # Should be expanded
assert np.min(forged_wfm2) >= 0.0 # Should still be non-negative


def test_setAmplitudeLUT_no_lut():
"""Test that _prepareForOutputting works correctly when no LUT is set"""

SR = 1e6

bp = bb.BluePrint()
bp.insertSegment(0, ramp, args=(0.0, 1.0), name="test_ramp", dur=5e-6)
bp.setSR(SR)

elem = bb.Element()
elem.addBluePrint(1, bp)

seq = Sequence()
seq.addElement(1, elem)
seq.setSR(SR)
seq.setChannelAmplitude(1, 2.0)
seq.setChannelOffset(1, 0.0)

# Don't set any LUT
elements = seq._prepareForOutputting()

# Should work without errors
assert len(elements) == 1
assert 1 in elements[0]
assert "wfm" in elements[0][1]

# Waveform should be the original ramp
wfm = elements[0][1]["wfm"]
assert np.min(wfm) >= 0.0
assert np.max(wfm) <= 1.0

# Test the forge() method when no LUT is set
forged_output = seq.forge()

# Verify structure of forged output
assert 1 in forged_output # Position 1 exists
assert forged_output[1]["type"] == "element"
assert "content" in forged_output[1]
assert 1 in forged_output[1]["content"] # Sub-position 1 exists
assert "data" in forged_output[1]["content"][1]
assert 1 in forged_output[1]["content"][1]["data"] # Channel 1 exists
assert "wfm" in forged_output[1]["content"][1]["data"][1]

# Extract the waveform from forge() output
forged_wfm = forged_output[1]["content"][1]["data"][1]["wfm"]

# The forged waveform should be identical to the _prepareForOutputting result
assert np.allclose(forged_wfm, wfm)

# Waveform should still be the original ramp (no LUT applied)
assert np.min(forged_wfm) >= 0.0
assert np.max(forged_wfm) <= 1.0