diff --git a/fault/__init__.py b/fault/__init__.py index 09dbf5a0..c5953ece 100644 --- a/fault/__init__.py +++ b/fault/__init__.py @@ -1,5 +1,6 @@ from .wrapped_internal_port import WrappedVerilogInternalPort from .ms_types import (RealIn, RealOut, RealType, + CurrentIn, CurrentOut, CurrentType, ElectIn, ElectOut, ElectType) from .tester import (Tester, SymbolicTester, PythonTester, TesterBase, SynchronousTester) diff --git a/fault/actions.py b/fault/actions.py index cc6228dd..fd7f2d39 100644 --- a/fault/actions.py +++ b/fault/actions.py @@ -8,6 +8,7 @@ import pysv from fault.select_path import SelectPath import fault.expression as expression +from fault.domain_read import domain_read class Action(ABC): @@ -96,10 +97,12 @@ def is_output(port): return not port.is_output() +@domain_read class GetValue(Action): - def __init__(self, port): + def __init__(self, port, params): super().__init__() self.port = port + self.params = params self.value = None # value to be assigned after simulation @property diff --git a/fault/background_poke.py b/fault/background_poke.py new file mode 100644 index 00000000..585ff29d --- /dev/null +++ b/fault/background_poke.py @@ -0,0 +1,313 @@ +from fault.actions import Poke, Var +import heapq +import math +from functools import total_ordering +import copy +from fault.actions import Delay + + +@total_ordering +class Thread: + # if sin wave dt is unspecified, use period/default_steps_per_cycle + default_steps_per_cycle = 10 + + # when checking clock value at time t, check time t+epsilon instead + # this avoids ambiguity when landing exactly on the clock edge + # Also, used to check whether a step was supposed to have happened exactly + # on a scheduled update, or is too early and we should reschedule + epsilon = 1e-17 + + def __init__(self, time, poke): + # print('creating thread for', poke, 'at time', time) + self.poke = copy.copy(poke) + self.poke.params = None + self.poke.delay = None + params = poke.delay + self.start = time + self.next_update = time + type_ = params.get('type', 'clock') + + # Each type must set a get_val(t) function and a dt + if type_ == 'clock': + freq = params.get('freq', 1e6) + period = params.get('period', 1/freq) + freq = 1 / period + duty_cycle = params.get('duty_cycle', 0.5) + initial_value = params.get('initial_value', 0) + + # had some trouble with epsilon being too small ... I think the + # issue is that the ThreadPool epsilon is in the time domain while + # this one is used in the cycle domain + self.epsilon *= freq + + # The way an edge gets dropped is if the time is 0.999 and we decide + # that's close enough to go to 1.5 next, but we also decide to + # output the value for the 0.5-1 range + def get_val(t): + cycle_location = ((t - self.start) / period) % 1 + edge_loc = (1-duty_cycle) if initial_value == 0 else duty_cycle + # if we're very close to the edge between regions, assume it + # was an attempt to hit the edge, and opt for later region + first_region = (cycle_location + self.epsilon) % 1 <= edge_loc + cycle_dt = ((edge_loc - cycle_location) % 1 if first_region + else (-cycle_location) % 1) + self.dt = cycle_dt * period + return (not first_region) ^ initial_value + + self.get_val = get_val + self.dt = period/2 + + elif type_ == 'sin': + freq = params.get('freq', 1e6) + period = params.get('period', 1/freq) + freq = 1 / period + amplitude = params.get('amplitude', 1) + offset = params.get('offset', 0) + phase_degrees = params.get('phase_degrees', 0) + conv = math.pi / 180.0 + phase_radians = params.get('phase_radians', phase_degrees * conv) + + def get_val(t): + x = math.sin((t-self.start)*freq*2*math.pi + phase_radians) + return amplitude * x + offset + + self.get_val = get_val + self.dt = params.get('dt', 1 / (freq*self.default_steps_per_cycle)) + + elif type_ == 'ramp': + start = params.get('start', 0) + stop = params.get('stop', None) + rate = params.get('rate', 1) # volts per second + etol = params.get('etol', 0.1) + assert etol > 0, 'Ramp error tolerance must be positive' + dt = abs(etol / rate) + + def get_val(t): + x = start + rate * (t - self.start) + if stop is not None: + if (rate > 0 and x > stop) or (rate < 0 and x < stop): + x = stop + self.dt = float('inf') + return x + self.get_val = get_val + self.dt = dt + + elif type_ == 'future': + wait = params.get('wait', None) + waits = params.get('waits', [wait]) + value = params.get('value', poke.value) + values = params.get('values', [value]) + + # for the duration of wait[i], output shoud be value[i] + values = [None] + values + waits = waits + [float('inf')] + + self.future_count = 0 + + def get_val(t): + if t + 2*self.epsilon > self.future_next: + msg = 'Missed update in future background poke' + assert t - self.future_next < self.epsilon, msg + self.future_count += 1 + self.future_next += waits[self.future_count] + self.dt = self.future_next - t + v = values[self.future_count] + + # Presumably this was here for a reason ... but I don't remember + if abs(t-self.next_update) <= self.epsilon: + if (self.future_next - (self.next_update + self.dt) + < -self.epsilon): + pass + else: + if self.future_next - self.next_update < -self.epsilon: + pass + return v + self.get_val = get_val + self.dt = waits[0] + self.future_next = self.start + self.dt + else: + assert False, 'Unrecognized background_poke type '+str(type_) + + def step(self, t): + """ + Returns a new Poke object with the correct value set for time t. + Sets the port and value but NOT the delay. + """ + # TODO don't poke at the same time twice + missed_update_msg = 'Background Poke thread not updated in time' + assert t <= self.next_update + self.epsilon, missed_update_msg + + # must call get_val before calculating next_update because it can + # alter self.dt + value = self.get_val(t) + if abs(t - self.next_update) < 2*self.epsilon: + self.next_update = t + self.dt + + if value is None: + # The actual delay will get set later + return Delay(-1) + else: + poke = copy.copy(self.poke) + poke.value = value + return poke + + def __lt__(self, other): + return self.next_update < other + + +class ThreadPool: + # if the next background update is within epsilon of the next manual + # update, then the background one comes first + # Which comes first is arbitrary, but this epsilon makes it consistent + epsilon = 1e-17 + + def __init__(self, time): + self.t = time + self.background_threads = [] + self.active_ports = set() + + @staticmethod + def set_action_delay(action, delay): + if isinstance(action, Delay): + action.time = delay + else: + action.delay = delay + + def get_next_update_time(self): + if len(self.background_threads) == 0: + return float('inf') + else: + return self.background_threads[0].next_update + + def delay(self, t_delay): + """ + Create a list of actions that need to happen during this delay. + Returns (free_time, actions), where free_time is the delay that should + happen before the first action in actions. + """ + t = self.t + t_end = self.t + t_delay + # note that free_time here is actually the free time until the next + # pool update, the actual time returned is until the next action and + # might be less than free_time + free_time = self.get_next_update_time() - self.t + if free_time > t_delay: + self.t = t_end + return t_delay, [] + else: + actions = [] + t += free_time + while self.get_next_update_time() <= t_end + self.epsilon: + thread = heapq.heappop(self.background_threads) + action = thread.step(t) + heapq.heappush(self.background_threads, thread) + + # we had to put the thread back on the heap in order to + # calculate the next update + next_thing_time = min(self.get_next_update_time(), t_end) + delay = next_thing_time - t + self.set_action_delay(action, delay) + t = next_thing_time + actions.append(action) + + # t_end has less floating point error than t + self.t = t_end + return free_time, actions + + def add(self, background_poke): + error_msg = 'Cannot add existing background thread' + assert background_poke.port not in self.active_ports, error_msg + self.active_ports.add(background_poke.port) + thread = Thread(self.t, background_poke) + heapq.heappush(self.background_threads, thread) + + def remove(self, port): + self.active_ports.remove(port) + for thread in self.background_threads: + if thread.poke.port is port: + offender = thread + break + else: + msg = 'background_poke internal error: Could not find remove port' + assert False, msg + self.background_threads.remove(offender) + heapq.heapify(self.background_threads) + poke = offender.step(self.t) + if poke is None: + # TODO I don't think this path is ever taken? + return [] + else: + self.set_action_delay(poke, 0) + return [poke] + + def process(self, action, delay): + new_action_list = [] + is_background = (isinstance(action, Poke) + and type(action.delay) == dict) + + # check whether this is a poke taking over a background port + # TODO if the port is a Var, just hope it's not taking over + if (isinstance(action, Poke) + and not isinstance(action.port, Var) + and action.port in self.active_ports): + new_action_list += self.remove(action.port) + + # if the new port is background we must add it before doing delay + if is_background: + self.add(action) + + # we might cut action's delay short to allow some background pokes + new_delay, actions = self.delay(delay) + + # now we add this (shortened) action back in + if not is_background: + # NOTE: we used to use copies of the action so we weren't editing + # the delay of an action owned by someone else. But with the new + # GetValue action it's important that the object doesn't change + # because the user is holding a pointer to the old GetValue object + self.set_action_delay(action, new_delay) + new_action_list.append(action) + + new_action_list += actions + return new_action_list + + +def process_action_list(actions, clock_step_delay): + """ + Replace Pokes with background_params with many individual pokes. + Automatically interleaves multiple background tasks with other pokes. + Throws a NotImplementedError if there's a background task during an + interval of time not known at compile time. + """ + + def get_delay(a): + if not hasattr(a, 'delay'): + return getattr(a, 'time', 0) + elif a.delay is not None: + if type(a.delay) == dict: + return 0 + else: + return a.delay + else: + # TODO this case used to use clock_step_delay instead of 0 + # I'm not sure exactly when it's appropriate to do that + return 0 # clock_step_delay + + background_pool = ThreadPool(0) + new_action_list = [] + for action in actions: + delay = get_delay(action) + new_action_list += background_pool.process(action, delay) + return new_action_list + + +def background_poke_target(cls): + class BackgroundPokeTarget(cls): + def run(self, *args, **kwargs): + assert (len(args) > 0 and isinstance(args[0], list), + 'Expected first arg to "target.run" to be an action list') + actions = args[0] + new_actions = process_action_list(actions, self.clock_step_delay) + new_args = (new_actions, *args[1:]) + super().run(*new_args, **kwargs) + return BackgroundPokeTarget diff --git a/fault/domain_read.py b/fault/domain_read.py new file mode 100644 index 00000000..b1a7ed2d --- /dev/null +++ b/fault/domain_read.py @@ -0,0 +1,217 @@ +import numpy as np +from fault.result_parse import SpiceResult, ResultInterp + + +# edge finder is used for measuring phase, freq, etc. +class EdgeNotFoundError(Exception): + pass + + +def get_value_domain(results, action, time, get_name): + style = action.params.pop('style') + # TODO is this the right way to get the name? + try: + res = results[get_name(action.port)] + except KeyError: + res = results[get_name(action.port).lower()] + + if not isinstance(res.t, np.ndarray): + res.t = np.array(res.t) + res.v = np.array(res.v) + + if isinstance(res, ResultInterp): + res_style = 'pwc' + elif isinstance(res, SpiceResult): + res_style = 'spice' + else: + assert False, f'Unrecognized result class {type(res)} for result {res}' + # TODO default height for spice is different and depends on vsup + height = action.params.get('height', 0.5) + if 'height' in action.params: + action.params.pop('height') + if style == 'single': + # equivalent to a regular get_value + value = res(time) + if type(value) == np.ndarray: + value = value.tolist() + action.value = value + elif style == 'edge': + # looking for a nearby rising/falling edge + # look at find_edge for possible parameters + value = find_edge(res_style, res.t, res.v, time, height, + **action.params) + action.value = value + elif style == 'frequency': + # frequency based on the (previous?) two rising edges + edges = find_edge(res_style, res.t, res.v, time, height, count=2) + freq = 1 / (edges[0] - edges[1]) + action.value = freq + elif style == 'phase': + # phase of this signal relative to another + msg = 'Phase read requires reference signal param' + assert 'ref' in action.params, msg + res_ref = results[f'{get_name(action.params["ref"])}'] + ref = find_edge(res_style, res_ref.t, res_ref.v, time, height, count=2) + before_cycle_end = find_edge(res_style, res.t, res.v, time + ref[0], + height) + fraction = 1 + before_cycle_end[0] / (ref[0] - ref[1]) + # TODO multiply by 2pi? + action.value = fraction + elif style == 'block': + # return a whole chunk of the waveform. + # returns (t, v) where t is time relative to the get_value action + assert 'duration' in action.params, 'Block read requires duration' + duration = action.params['duration'] + # make sure to grab points surrounding requested times so user can + # interpolate the exact start and end. + start = max(0, np.argmax(res.t > time) - 1) + end = ((len(res.t) - 1) if res.t[-1] < time + duration + else np.argmax(res.t >= time + duration)) + + t = res.t[start:end] - time + v = res.v[start:end] + action.value = (t, v) + else: + raise NotImplementedError(f'Unknown style "{style}"') + + +def find_edge(res_style, *args, **kwargs): + if res_style == 'spice': + return find_edge_spice(*args, **kwargs) + elif res_style == 'pwc': + return find_edge_pwc(*args, **kwargs) + else: + assert False, f'Unrecognized result style "{res_style}"' + + +def find_edge_pwc(x, y, t_start, height, forward=False, count=1, rising=True): + """ + Search through data (x,y) starting at time t_start for when the + waveform crosses height (defaut is ???). Searches backwards by + default (frequency now is probably based on the last few edges?) + """ + + # deal with `rising` and `forward` + # normally a low-to-high finder + if rising ^ forward: + y = [(-1 * z + 2 * height) for z in y] + direction = 1 if forward else -1 + # we want to start on the far side of the interval containing t_start + # to make sure we catch any edge near t_start + side = 'left' if forward else 'right' + + start_index = np.searchsorted(x, t_start, side=side) + if start_index == len(x): + # happens when forward=False and the edge find is the end of the sim + start_index -= 1 + + i = start_index + edges = [] + while len(edges) < count: + # move until we hit low + while y[i] > height: + i += direction + if i < 0 or i >= len(y): + msg = f'only {len(edges)} of requested {count} edges found' + raise EdgeNotFoundError(msg) + # now move until we hit the high + while y[i] <= height: + i += direction + if i < 0 or i >= len(y): + msg = f'only {len(edges)} of requested {count} edges found' + raise EdgeNotFoundError(msg) + + # moving forward the crossing happens eactly when we hit the high value + # moving backward it happens at the last low one we saw (one thing ago) + t = x[i] if direction == 1 else x[i+1] + if t == t_start: + print('EDGE EXACTLY AT EDGE FIND REQUEST') + elif (t - t_start) * direction < 0: + # we probably backed up to consider the point half a step before + # t_start don't count this one, and keep looking + continue + edges.append(t - t_start) + return edges + + +def find_edge_spice(x, y, t_start, height, forward=False, count=1, rising=True): + """ + Search through data (x,y) starting at time t_start for when the + waveform crosses height (defaut is ???). Searches backwards by + default (frequency now is probably based on the last few edges?) + """ + + # deal with `rising` and `forward` + # normally a low-to-high finder + if rising ^ forward: + y = [(-1 * z + 2 * height) for z in y] + direction = 1 if forward else -1 + + # we want to start on the far side of the interval containing t_start + # to make sure we catch any edge near t_start + side = 'right' if forward else 'left' + bump = -1 if forward else 0 + start_index = np.searchsorted(x, t_start, side=side) + bump + if start_index == len(x): + # happens when forward=False and the edge find is the end of the sim + start_index -= 1 + + i = start_index + edges = [] + while len(edges) < count: + # move until we hit low + while y[i] > height: + i += direction + if i < 0 or i >= len(y): + msg = f'only {len(edges)} of requested {count} edges found' + raise EdgeNotFoundError(msg) + # now move until we hit the high + while y[i] <= height: + i += direction + if i < 0 or i >= len(y): + msg = f'only {len(edges)} of requested {count} edges found' + raise EdgeNotFoundError(msg) + + # the crossing happens from i to i+1 + # fraction is how far you must go from i to (i-direction) to hit the + # crossing + fraction = (height - y[i]) / (y[i - direction] - y[i]) + # TODO for a long time this said "x[i + 1] - x[i]", I should + # triple-check that it's correct now + t = x[i] + fraction * (x[i-direction] - x[i]) + if t == t_start: + print('EDGE EXACTLY AT EDGE FIND REQUEST') + elif (t - t_start) * direction < 0: + # we probably backed up to consider the point half a step before + # t_start + # don't count this one, and keep looking + continue + edges.append(t - t_start) + return edges + + +def domain_read(cls): + class DomainGetValue(cls): + def is_domain_read(self): + return type(self.params) == dict and 'style' in self.params + + def get_format(self): + if self.is_domain_read(): + # this is a domain read + # this is pretty hacky... + return '%0.15f\\n", $realtime/1s);//' + else: + return super().get_format() + + def update_from_line(self, line): + if self.is_domain_read(): + # here we temporily put the time in self.value + # the time will be used as an index into the waveform file, + # and value will be changed later + # pycharm complains here that we are defining self.value + # outside __init__, but we know cls already has self.value + self.value = float(line.strip()) + else: + super().update_from_line(line) + + return DomainGetValue diff --git a/fault/ms_types.py b/fault/ms_types.py index 0ffd5821..ad65db31 100644 --- a/fault/ms_types.py +++ b/fault/ms_types.py @@ -10,6 +10,15 @@ class RealType(Digital): RealInOut = RealType[Direction.InOut] +class CurrentType(Digital): + pass + + +CurrentIn = CurrentType[Direction.In] +CurrentOut = CurrentType[Direction.Out] +CurrentInOut = CurrentType[Direction.InOut] + + class ElectType(Digital): pass diff --git a/fault/pwl.py b/fault/pwl.py index 5965fc4f..1310163a 100644 --- a/fault/pwl.py +++ b/fault/pwl.py @@ -11,10 +11,30 @@ def pwc_to_pwl(pwc, t_stop, t_tr, init=0): t_prev, v_prev = pwc[k - 1] t_curr, v_curr = pwc[k] + if retval[-1][0] >= t_curr: + assert retval[-2][0] <= t_curr, \ + 'non-increasing pwc steps at time' % t_curr + if abs(retval[-2][0] - t_curr) < 1e-19: + # two values at the same time, just drop the earlier + #print('DROPPING old thing, diff:', retval[-2][0] - t_curr) + retval.pop() + old_t, old_v = retval.pop() + v_prev = old_v + else: + # make the previous thing happen faster than t_tr + #print('DOING THE HALFWAY THING') + halfway_time = (retval[-2][0] + t_curr)/2 + retval[-1] = (halfway_time, retval[-1][1]) + + #print('times', t_curr, t_curr + t_tr) retval += [(t_curr, v_prev)] retval += [(t_curr + t_tr, v_curr)] # add final value + # cut off anything after t_stop + while len(retval) > 0 and retval[-1][0] >= t_stop: + #print('removing one from end') + retval.pop() retval += [(t_stop, pwc[-1][1])] # return new waveform diff --git a/fault/result_parse.py b/fault/result_parse.py index 808b3fc4..3c24d321 100644 --- a/fault/result_parse.py +++ b/fault/result_parse.py @@ -1,3 +1,6 @@ +import re +from fault.ms_types import RealType + try: import numpy as np from scipy.interpolate import interp1d @@ -14,6 +17,16 @@ def __init__(self, t, v): def __call__(self, t): return self.func(t) + +# TODO should probably rename this to something like "PWCResult" +class ResultInterp: + def __init__(self, t, v, interp='linear'): + self.t = t + self.v = v + self.func = interp1d(t, v, bounds_error=False, fill_value=(v[0], v[-1]), kind=interp) + + def __call__(self, t): + return self.func(t) # temporary measure -- CSDF parsing is broken in @@ -116,3 +129,92 @@ def data_to_interp(data, time, strip_vi=True): # return results return retval + + +def parse_vcd(filename, dut, interp='previous'): + # unfortunately this vcd parser has an annoyin quirk: + # it throws away the format specifier for numbers so we can't tell if they're binary or real + # so "reg [7:0] a = 8'd4;" and "real a = 100.0;" both come back as the string '100' + # TODO fix this + filename = 'build/' + filename + + # library doesn't grab timescale, so we do it manually + with open(filename) as f: + next = False + for line in f: + if next: + ts = line.strip() + break + if '$timescale' in line: + next = True + else: + assert False, f'Timescale not found in vcd {filename}' + + scales = { + 'fs': 1e-15, + 'ps': 1e-12, + 'ns': 1e-9, + 'us': 1e-6, + 'ms': 1e-3, + 's': 1e0 + } + ts.replace(' ', '') + scale_val_found = 1 + for scale_string, scale_val in scales.items(): + if scale_string in ts: + ts = ts.replace(scale_string, '') + scale_val_found *= scale_val + timescale = float(ts) * scale_val_found + #if ts[-1] not in scales: + # assert False, f'Unrecognized timescale {ts[-1]}' + #timescale = float(ts[0]) * scales[ts[1]] + + from vcdvcd import VCDVCD + obj = VCDVCD(filename) + + def get_name_from_v(v): + name_vcd = v.references[0].split('.')[-1] + name_fault = re.sub('\[[0-9]*:[0-9]*\]', '', name_vcd) + return name_fault + + def format(name, val_str): + if not hasattr(dut, name): + # we don't know what the type is, but we know scipy will try to cast it to float + # we can't assume bits becasue mlingua etol stuff is in this category + # we can't assume real becuase bit types with value 'x' come through here too + try: + return float(val_str) + except ValueError: + return float('nan') + + a = getattr(dut, name) + b = isinstance(a, RealType) + + if b: + return float(val_str) + else: + # TODO deal with x and z + # Right now it will invalidate the whole bus if one bit is x or z + if 'x' in val_str: + return float('nan') + elif 'z' in val_str: + return float('nan') + return int(val_str, 2) + + data = obj.get_data() + data = {get_name_from_v(v): v.tv for v in data.values()} + end_time = obj.get_endtime() + + retval = {} + for port, vs in data.items(): + t, v = zip(*vs) + t, v = list(t), list(v) + # append an additional point at the end time + t.append(end_time) + v.append(v[-1]) + t, v = [time * timescale for time in t], [format(port, val) for val in v] + + r = ResultInterp(t, v, interp=interp) + retval[port] = r + + return retval diff --git a/fault/spice.py b/fault/spice.py index c908d40b..4e72c562 100644 --- a/fault/spice.py +++ b/fault/spice.py @@ -75,7 +75,8 @@ def instantiate(self, name, *ports, inst_name=None): port_str = ' '.join(f'{port}' for port in ports) self.println(f'X{inst_name} {port_str} {name}') - def voltage(self, p, n, dc=None, pwl=None, inst_name=None): + def stimulus(self, letter, p, n, dc=None, pwl=None, inst_name=None): + # set defaults if inst_name is None: inst_name = f'{next(self.inst_count)}' @@ -84,7 +85,7 @@ def voltage(self, p, n, dc=None, pwl=None, inst_name=None): # build up the line line = [] - line += [f'V{inst_name}'] + line += [f'{letter}{inst_name}'] line += [f'{p}', f'{n}'] if dc is not None: line += ['DC', f'{dc}'] @@ -95,6 +96,12 @@ def voltage(self, p, n, dc=None, pwl=None, inst_name=None): # print the line self.println(' '.join(line)) + def voltage(self, p, n, dc=None, pwl=None, inst_name=None): + self.stimulus('V', p, n, dc=dc, pwl=pwl, inst_name=inst_name) + + def current(self, p, n, dc=None, pwl=None, inst_name=None): + self.stimulus('I', p, n, dc=dc, pwl=pwl, inst_name=inst_name) + def capacitor(self, p, n, value, inst_name=None): # set defaults if inst_name is None: @@ -109,6 +116,20 @@ def capacitor(self, p, n, value, inst_name=None): # print the line self.println(' '.join(line)) + def resistor(self, p, n, value, inst_name=None): + # set defaults + if inst_name is None: + inst_name = f'{next(self.inst_count)}' + + # build up the line + line = [] + line += [f'R{inst_name}'] + line += [f'{p}', f'{n}'] + line += [f'{value}'] + + # print the line + self.println(' '.join(line)) + def switch(self, sw_p, sw_n, ctl_p, ctl_n, mod_name, inst_name=None, default=None): # set defaults diff --git a/fault/spice_target.py b/fault/spice_target.py index ef31e431..d95f9cd9 100644 --- a/fault/spice_target.py +++ b/fault/spice_target.py @@ -11,8 +11,10 @@ from fault.subprocess_run import subprocess_run from fault.pwl import pwc_to_pwl from fault.actions import Poke, Expect, Delay, Print, GetValue, Eval +from fault.background_poke import background_poke_target from fault.select_path import SelectPath from .fault_errors import A2DError, ExpectError +from fault.domain_read import get_value_domain try: from decida.SimulatorNetlist import SimulatorNetlist @@ -20,7 +22,6 @@ except ModuleNotFoundError: print('Failed to import DeCiDa or Numpy for SpiceTarget.') - class CompiledSpiceActions: def __init__(self, pwls, checks, prints, stop_time, saves, gets): self.pwls = pwls @@ -75,10 +76,11 @@ def DeclareFromSpice(file_name, subckt_name=None, mode='digital'): ''' +@background_poke_target class SpiceTarget(Target): def __init__(self, circuit, directory="build/", simulator='ngspice', vsup=1.0, rout=1, model_paths=None, sim_env=None, - t_step=None, clock_step_delay=5, t_tr=0.2e-9, vil_rel=0.4, + t_step=None, clock_step_delay=5e-9, t_tr=0.2e-9, vil_rel=0.4, vih_rel=0.6, rz=1e9, conn_order='parse', bus_delim='<>', bus_order='descend', flags=None, ic=None, cap_loads=None, disp_type='on_error', mc_runs=0, mc_variations='all', @@ -204,13 +206,41 @@ def __init__(self, circuit, directory="build/", simulator='ngspice', # set list of signals to save self.saves = set() for name, port in self.circuit.interface.ports.items(): - if isinstance(port, m.BitsType): + if (isinstance(port, m.BitsType) + or isinstance(port, m.ArrayType)): for k in range(len(port)): self.saves.add(self.bit_from_bus(name, k)) else: self.saves.add(f'{name}') + class PortDict: + ''' + This exists because for ports, "a is b" does not imply "a == b". + I'm making the assumption here that the hash is the address, so hash + equality implies port equality. + I'm also assuming ports are unique, so pointer inequality implies port + inequality. + ''' + def __init__(self): + self.d = {} + + def myhash(self, item): + return str(item.name) + + def __setitem__(self, key, value): + self.d[self.myhash(key)] = (key, value) + + def __getitem__(self, item): + return self.d[self.myhash(item)][1] + + def __contains__(self, item): + return self.myhash(item) in self.d + + def items(self): + return (v for k, v in self.d.items()) + def run(self, actions): + # compile the actions comp = self.compile_actions(actions) @@ -229,8 +259,13 @@ def run(self, actions): # run the simulation commands if not self.no_run: - subprocess_run(cmd, cwd=self.directory, env=self.sim_env, + res = subprocess_run(cmd, cwd=self.directory, env=self.sim_env, disp_type=self.disp_type) + #print(res.stdout) + stderr = res.stderr.strip() + if stderr != '': + print('Stderr from spice simulator:') + print(stderr) # process the results for raw_file in raw_files: @@ -286,17 +321,24 @@ def get_value_at_bit(k): def compile_actions(self, actions): # initialize t = 0 - pwc_dict = {} + pwc_dict = self.PortDict() #{} checks = [] prints = [] gets = [] + # TODO is this still necessary? + saves = set() + # expand buses as needed _actions = [] for action in actions: if isinstance(action, (Poke, Expect)) \ and isinstance(action.port, m.Bits): _actions += self.expand_bus(action) + elif (isinstance(action, (Poke, Expect)) + and isinstance(action.port.name, m.ref.ArrayRef)): + action.port = self.select_bit_from_bus(action.port) + _actions.append(action) else: _actions.append(action) actions = _actions @@ -305,7 +347,11 @@ def compile_actions(self, actions): for action in actions: if isinstance(action, Poke): # add port to stimulus dictionary if needed - action_port_name = f'{action.port.name}' + # TODO change name of this variable + action_port_name = action.port # f'{action.port.name}' + # keys need to be port objects because we ask about the type + # later, but we can't compare port object equality directly + if action_port_name not in pwc_dict: pwc_dict[action_port_name] = ([], []) # determine the stimulus value, performing a digital @@ -325,13 +371,17 @@ def compile_actions(self, actions): pwc_dict[action_port_name][1].append((t, stim_s)) # increment time if desired if action.delay is None: - t += self.clock_step_delay * 1e-9 + t += self.clock_step_delay else: t += action.delay elif isinstance(action, Expect): checks.append((t, action)) elif isinstance(action, Print): prints.append((t, action)) + + # TODO: is this still necessary? + for port in action.ports: + saves.add(f'{port.name}') elif isinstance(action, GetValue): gets.append((t, action)) elif isinstance(action, Delay): @@ -342,7 +392,8 @@ def compile_actions(self, actions): raise NotImplementedError(action) # refactor stimulus voltages to PWL - pwls = {} + pwls = self.PortDict() + # TODO change "name" to "port" for name, pwc in pwc_dict.items(): pwls[name] = ( pwc_to_pwl(pwc=pwc[0], t_stop=t, t_tr=self.t_tr), @@ -381,6 +432,17 @@ def bit_from_bus(self, port, k): else: raise Exception(f'Unknown bus delimeter: {self.bus_delim}') + def select_bit_from_bus(self, port): + # The default way magma deals with naming one pin of a bus + # does not match our spice convention. We need to get the + # name of the original bus and index it ourselves. + bus_name = port.name.array.name + bus_index = port.name.index + bit_name = self.bit_from_bus(bus_name, bus_index) + new_port = m.Bit(name=bit_name) + return new_port + + def get_alpha_ordered_ports(self): # get ports sorted in alphabetical order port_names = self.circuit.interface.ports.keys() @@ -431,6 +493,15 @@ def write_test_bench(self, comp, tb_file=None): for port, val in self.cap_loads.items(): netlist.capacitor(f'{port.name}', '0', val) + # add a place to sink current outputs + for name, port in self.circuit.IO.ports.items(): + # NOTE: this finds current outputs, despite saying CurrentIn + if isinstance(port, fault.CurrentIn): + # TODO: is 1 Ohm good? + # RECALL we are measuring voltage here so 1 Ohm means volts=amps + # If we change this line we should change readout too + netlist.resistor(name, '0', '1') + # define the switch model inout_sw_mod = 'inout_sw_mod' netlist.start_subckt(inout_sw_mod, 'sw_p', 'sw_n', 'ctl_p', 'ctl_n') @@ -444,15 +515,28 @@ def write_test_bench(self, comp, tb_file=None): netlist.end_subckt() # write stimuli lines - for name, (pwl_v, pwl_s) in comp.pwls.items(): - # instantiate switch between voltage source and DUT - vnet = f'__{name}_v' - snet = f'__{name}_s' - netlist.instantiate('inout_sw_mod', vnet, name, snet, '0') - - # instantiate voltage source connected through switch - netlist.voltage(vnet, '0', pwl=pwl_v) - netlist.voltage(snet, '0', pwl=pwl_s) + port_name_mapping = {} + for port in self.circuit.IO.ports.values(): + pass + + for port, (pwl_v, pwl_s) in comp.pwls.items(): + name = f'{port.name}' + #port = self.circuit.IO.ports[name] + if isinstance(port, fault.CurrentType): + # TODO assert pwl_s is always high + netlist.current('0', name, pwl=pwl_v) + elif isinstance(port, (fault.RealType, m.Bit, type(m.Bit))): + # instantiate switch between voltage source and DUT + vnet = f'__{name}_v' + snet = f'__{name}_s' + netlist.instantiate('inout_sw_mod', vnet, name, snet, '0') + + # instantiate voltage source connected through switch + netlist.voltage(vnet, '0', pwl=pwl_v) + netlist.voltage(snet, '0', pwl=pwl_s) + else: + raise NotImplementedError( + f'Port type for {port} not implemented in spice target') # specify initial conditions if needed ic = {} @@ -566,15 +650,43 @@ def print_results(self, results, prints): for print_ in prints: self.impl_print(results=results, time=print_[0], action=print_[1]) + def process_reads(self, results, reads): + for time, read in reads: + res = results[f'{read.port.name}'] + def impl_all_gets(self, results, gets): for get in gets: self.impl_get(results=results, time=get[0], action=get[1]) def impl_get(self, results, time, action): - # get port values - port_value = results[f'{action.port.name}'](time) - # write value back to action - action.value = port_value + # TODO: use this same function in more places? + def get_spice_name(p): + if isinstance(p.name, m.ref.ArrayRef): + return str(self.select_bit_from_bus(p)) + else: + return f'{p.name}' + + # grab the relevant info + try: + res = results[get_spice_name(action.port)] + except KeyError: + res = results[get_spice_name(action.port).lower()] + if action.params == None: + # straightforward read of voltage + # get port values + port_value = res(time) + # write value back to action + action.value = port_value + elif type(action.params) == dict and 'style' in action.params: + # requires some analysis of signal + # get height of slice point based on spice config if not specified + if 'height' not in action.params: + action.params['height'] = self.vsup * (self.vih_rel + self.vil_rel) / 2 + # some styles (e.g. phase) might need to reference another port, + # so passing in the name is not good enough + get_value_domain(results, action, time, get_spice_name) + else: + raise NotImplementedError def ngspice_cmds(self, tb_file): # build up the command diff --git a/fault/system_verilog_target.py b/fault/system_verilog_target.py index 0d5d97ec..20394d51 100644 --- a/fault/system_verilog_target.py +++ b/fault/system_verilog_target.py @@ -13,6 +13,7 @@ from fault.select_path import SelectPath from fault.wrapper import PortWrapper from fault.subprocess_run import subprocess_run +from fault.background_poke import background_poke_target import fault import fault.expression as expression from fault.value import Value @@ -23,6 +24,7 @@ src_tpl = """\ +{includes} {timescale} module {top_module}; {imports} @@ -45,7 +47,7 @@ endmodule """ - +@background_poke_target class SystemVerilogTarget(VerilogTarget): # Language properties of SystemVerilog used in generating code blocks @@ -272,6 +274,8 @@ def __init__(self, circuit, circuit_name=None, directory="build/", raise ImportError("Cannot find kratos-runtime in the system. " "Please do \"pip install kratos-runtime\" " "to install.") + + self.includes = [] self.fsdb_dumpvars_args = fsdb_dumpvars_args # set up cadence tools command @@ -739,6 +743,18 @@ def generate_code(self, actions, power_args): if any(isinstance(action, GetValue) for action in actions): actions = [FileOpen(self.value_file)] + actions actions += [FileClose(self.value_file)] + + # check for reading in other domains + domain_read_ports = set() + for action in actions: + if not isinstance(action, GetValue): + continue + if type(action.params) == dict and 'style' in action.params: + domain_read_ports.add( action.port) + for port in domain_read_ports: + print('Interesting read on', port) + # TODO in the future, if not self.dump_waveforms, we need to dump this port anyway + pass # handle all of user-specified actions in the testbench for i, action in enumerate(actions): @@ -763,12 +779,16 @@ def generate_code(self, actions, power_args): declarations = '\n'.join(declarations) # format assignments - assigns = [f'{self.TAB}assign {lhs}={rhs};' + assigns = [f'{self.TAB}assign {lhs}' \ + f'={rhs};' for lhs, rhs in self.assigns.values()] assigns = '\n'.join(assigns) # add timescale timescale = f'`timescale {self.timescale}' + + # add includes + includes = '\n'.join(f'`include "{f}"' for f in self.includes) clock_drivers = self.TAB + "\n{self.TAB}".join(self.clock_drivers) @@ -780,6 +800,7 @@ def generate_code(self, actions, power_args): # fill out values in the testbench template src = src_tpl.format( + includes=includes, timescale=timescale, imports=imports, declarations=declarations, @@ -884,6 +905,7 @@ def run(self, actions, power_args=None): # post-process GetValue actions self.post_process_get_value_actions(actions) + def write_test_bench(self, actions, power_args): # determine the path of the testbench file tb_file = self.directory / Path(f'{self.circuit_name}_tb.sv') diff --git a/fault/tester/staged_tester.py b/fault/tester/staged_tester.py index c4f219cd..4ac9277f 100644 --- a/fault/tester/staged_tester.py +++ b/fault/tester/staged_tester.py @@ -175,12 +175,12 @@ def delay(self, time): """ self.actions.append(actions.Delay(time=time)) - def get_value(self, port): + def get_value(self, port, params=None): """ Returns an object with a "value" property that will be filled after the simulation completes. """ - action = actions.GetValue(port=port) + action = actions.GetValue(port=port, params=params) self.actions.append(action) return action diff --git a/fault/value_utils.py b/fault/value_utils.py index 634b5edb..7dd19590 100644 --- a/fault/value_utils.py +++ b/fault/value_utils.py @@ -4,7 +4,7 @@ from magma.protocol_type import MagmaProtocol from hwtypes import BitVector, Bit, FPVector from fault.value import AnyValue, UnknownValue, HiZ -from fault.ms_types import RealType +from fault.ms_types import RealType, CurrentType from fault.array import Array from fault.select_path import SelectPath from hwtypes.adt import Enum @@ -21,6 +21,8 @@ def make_value(type_, value): "MagmaProtocol") if issubclass(type_, RealType): return make_real(value) + if issubclass(type_, CurrentType): + return make_real(value) if issubclass(type_, magma.Digital): return make_bit(value) if issubclass(type_, magma.Array): diff --git a/fault/verilog_target.py b/fault/verilog_target.py index 265c11d6..4e09ed2a 100644 --- a/fault/verilog_target.py +++ b/fault/verilog_target.py @@ -7,7 +7,8 @@ from fault.util import flatten import os from fault.select_path import SelectPath - +from fault.result_parse import parse_vcd +from fault.domain_read import get_value_domain class VerilogTarget(Target): """ @@ -321,12 +322,32 @@ def make_block(self, i, name, cond, actions, label=None): def post_process_get_value_actions(self, all_actions): get_value_actions = [action for action in all_actions if isinstance(action, actions.GetValue)] + if len(get_value_actions) > 0: with open(self.value_file.name, 'r') as f: lines = f.readlines() for line, action in zip(lines, get_value_actions): action.update_from_line(line) + # sort out actions with params + get_value_actions_params = [action for action in get_value_actions if action.params != None] + + def get_name(port): + return str(port.name) + if len(get_value_actions_params) > 0: + # TODO waveform_file is technically a property of systemverilog target only + # I think we could make a small adjustment for Verilator but I haven't yet + if not hasattr(self, 'waveform_file'): + raise NotImplementedError('Domain Read not yet implemented' + + ' for targets without waveform_file') + err_msg = ('No waveform file found for domain_read. ' + + 'Did you compile Tester with "dump_waveforms=True"?') + assert self.waveform_file is not None, err_msg + res = parse_vcd(self.waveform_file, self.circuit) + for a in get_value_actions_params: + # the time has already been temporarily stored in a.value + get_value_domain(res, a, a.value, get_name) + @staticmethod def in_var(file): '''Name of variable used to read in contents of file.''' diff --git a/tests/spice/mycurrenttest.sp b/tests/spice/mycurrenttest.sp new file mode 100644 index 00000000..a92bef5c --- /dev/null +++ b/tests/spice/mycurrenttest.sp @@ -0,0 +1,18 @@ +* Circuit to test current inputs and outputs +* in_c: input current +* in_v: input voltage +* out_c: output current = in_v / 100 Ohm +* out_v: output voltage = in_c * 500 Ohm + +.subckt mycurrenttest in_c in_v out_c out_v + +Vmeas in_v meas_node 0 +R1 meas_node 0 100 +* TODO mirror current +F1 0 out_c Vmeas 1 +*R4 out_c 0 123 + +R2 in_c 0 500 +R3 in_c out_v 10 + +.ends diff --git a/tests/test_background_poke.py b/tests/test_background_poke.py new file mode 100644 index 00000000..0b1007c0 --- /dev/null +++ b/tests/test_background_poke.py @@ -0,0 +1,43 @@ +import fault +from .common import pytest_sim_params, TestBasicCircuit + + +def pytest_generate_tests(metafunc): + # Not implemented for verilator because this doesn't make much sense + # without a concept of delay + pytest_sim_params(metafunc, 'system-verilog') + + +def test_clock_verilog(target, simulator): + circ = TestBasicCircuit + tester = fault.Tester(circ) + tester.zero_inputs() + tester.poke(circ.I, 1) + tester.eval() + tester.expect(circ.O, 1) + + # register clock + tester.poke(circ.I, 0, delay={'freq': 1e9, 'duty_cycle': 0.75}) + + # This delay would move the "expect"s off the edge. + # Right now, the test ensures that an expect directly on the edge gets the + # post-edge value. + # tester.delay(.0625e-9) + + # Break the cycle into eighths. First 2 eighths read 0, next 6 read 1 + for i in range(100): + for j in range(2): + tester.eval() + tester.expect(circ.O, 0) + tester.delay(0.125e-9) + for j in range(6): + tester.eval() + tester.expect(circ.O, 1) + tester.delay(0.125e-9) + + tester.print("%08x", circ.O) + + if target == "verilator": + tester.compile_and_run(target, tmp_dir=True, flags=["-Wno-fatal"]) + else: + tester.compile_and_run(target, tmp_dir=True, simulator=simulator) diff --git a/tests/test_background_poke_analog.py b/tests/test_background_poke_analog.py new file mode 100644 index 00000000..b962f633 --- /dev/null +++ b/tests/test_background_poke_analog.py @@ -0,0 +1,92 @@ +import magma as m +import fault +from pathlib import Path +from .common import pytest_sim_params +import math + + +def plot(xs, ys): + import matplotlib.pyplot as plt + plt.plot(xs, ys, '*') + plt.grid() + plt.show() + + +def pytest_generate_tests(metafunc): + # Not implemented for verilator because this doesn't make much sense + # without a concept of delay + pytest_sim_params(metafunc, 'spice') + + +def test_sin_spice(target, simulator, vsup=1.5): + # declare circuit + myinv = m.DeclareCircuit( + 'myinv', + 'in_', fault.RealIn, + 'out', fault.RealOut, + 'vdd', fault.RealIn, + 'vss', fault.RealIn + ) + + # wrap if needed + if target == 'verilog-ams': + dut = fault.VAMSWrap(myinv) + else: + dut = myinv + + # define the test + tester = fault.Tester(dut) + tester.poke(dut.vdd, vsup) + tester.poke(dut.vss, 0) + freq = 1e3 + amp = 0.4 + offset = 0.6 + phase_d = 90 + tester.poke(dut.in_, 0, delay={ + 'type': 'sin', + 'freq': freq, + 'amplitude': amp, + 'offset': offset, + 'phase_degrees': phase_d, + 'dt': 1 / (freq * 30) + }) + + def model(x): + return (amp * math.sin(2 * math.pi * freq * x + math.radians(phase_d)) + + offset) + + num_reads = 100 + xs = [] + dt = 1 / (freq * 50) + gets = [] + for k in range(num_reads): + gets.append(tester.get_value(dut.in_)) + tester.delay(dt) + xs.append(k * dt) + + # set options + kwargs = dict( + target=target, + simulator=simulator, + model_paths=[Path('tests/spice/myinv.sp').resolve()], + vsup=vsup, + tmp_dir=True, + clock_step_delay=0 + ) + if target == 'verilog-ams': + kwargs['use_spice'] = ['myinv'] + + # run the simulation + tester.compile_and_run(**kwargs) + + ys = [] + for k in range(num_reads): + value = gets[k].value + ys.append(value) + print('%2d\t' % k, value) + + assert abs(model(xs[k]) - ys[k]) <= amp * 0.25 + + # Debug plots + # plot(xs, ys) + # plot(xs, [model(x) for x in xs]) diff --git a/tests/test_get_value_analog.py b/tests/test_get_value_analog.py index 67c3dcbb..c679a91c 100644 --- a/tests/test_get_value_analog.py +++ b/tests/test_get_value_analog.py @@ -61,4 +61,10 @@ def model(a, b): for (a, b), c in zip(stim, output): lb = model(a, b) - 0.01 ub = model(a, b) + 0.01 + print('Asserting', lb, c.value, ub) assert lb <= c.value <= ub + +if __name__ == '__main__': + #test_get_value_analog('spice', 'ngspice') + #test_get_value_analog('spice', 'hspice') + test_get_value_analog('spice', 'spectre') diff --git a/tests/test_get_value_domain_analog.py b/tests/test_get_value_domain_analog.py new file mode 100644 index 00000000..8faf85ea --- /dev/null +++ b/tests/test_get_value_domain_analog.py @@ -0,0 +1,166 @@ +import magma as m +import fault +from pathlib import Path +from .common import pytest_sim_params + + +def pytest_generate_tests(metafunc): + pytest_sim_params(metafunc, 'spice') + + +def get_inv_tester(target, vsup): + # declare circuit + myinv = m.DeclareCircuit( + 'myinv', + 'in_', fault.RealIn, + 'out', fault.RealOut, + 'vdd', fault.RealIn, + 'vss', fault.RealIn + ) + + # wrap if needed + if target == 'verilog-ams': + dut = fault.VAMSWrap(myinv) + else: + dut = myinv + + tester = fault.Tester(dut) + tester.poke(dut.vdd, vsup) + tester.poke(dut.vss, 0) + return dut, tester + + +def test_edge(target, simulator, vsup=1.8): + dut, tester = get_inv_tester(target, vsup) + + tester.delay(10e-3) + + # Each horizontal line is 0.5ms (spaces don't count) + # Input Waveform: _ _ _ _ _ _ _ + # ***_ _ _| |_ _ _| |_ _ _| |_ _ _ _| |_ _ _ _| |_ *** + + # Output Waveform: + # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ + # *** |_| |_| |_| |_ _| |_ _| *** + # get_value: ^ + tester.poke(dut.in_, 0, delay=1.5e-3) + tester.poke(dut.in_, vsup, delay=0.5e-3) + tester.poke(dut.in_, 0, delay=1.5e-3) + tester.poke(dut.in_, vsup, delay=0.5e-3) + tester.poke(dut.in_, 0, delay=1.5e-3) + tester.poke(dut.in_, vsup, delay=0.5e-3) + + # We add a significant cap load to delay the output slightly, + # so we only catch the nearby edge when looking forwards + # default is to look backwards and find rising edges + a = tester.get_value(dut.out, params={'style': 'edge', 'count': 2}) + a_expect = [-2e-3, -4e-3] + b = tester.get_value(dut.out, + params={'style': 'edge', 'count': 2, 'rising': False}) + b_expect = [-0.5e-3, -2.5e-3] + c = tester.get_value(dut.out, + params={'style': 'edge', 'count': 2, 'forward': True}) + c_expect = [0, 3e-3] + d = tester.get_value(dut.out, + params={'style': 'edge', 'count': 2, 'forward': True, + 'rising': False}) + d_expect = [2e-3, 5e-3] + + tester.poke(dut.in_, 0, delay=2e-3) + tester.poke(dut.in_, vsup, delay=1e-3) + tester.poke(dut.in_, 0, delay=2e-3) + tester.poke(dut.in_, vsup, delay=1e-3) + tester.poke(dut.in_, 0, delay=2e-3) + tester.poke(dut.in_, vsup, delay=1e-3) + + # set options + kwargs = dict( + target=target, + simulator=simulator, + model_paths=[Path('tests/spice/myinv.sp').resolve()], + vsup=vsup, + cap_loads={dut.out: 10e-9} + ) + if target == 'verilog-ams': + kwargs['use_spice'] = ['myinv'] + + # run the simulation + tester.compile_and_run(**kwargs) + + def eq(xs, ys): + for x, y in zip(xs, ys): + if abs(x - y) > 5e-5: + return False + return True + + print('Measured edge timings:') + print(a.value) + print(b.value) + print(c.value) + print(d.value) + + assert eq(a.value, a_expect) + assert eq(b.value, b_expect) + assert eq(c.value, c_expect) + assert eq(d.value, d_expect) + + +def test_phase(target, simulator, vsup=1.5): + # declare circuit + mybus = m.DeclareCircuit( + 'mybus', + 'a', m.In(m.Bits[2]), + 'b', m.Out(m.Bits[3]), + 'vdd', m.BitIn, + 'vss', m.BitIn + ) + + # wrap if needed + if target == 'verilog-ams': + dut = fault.VAMSWrap(mybus) + else: + dut = mybus + + # define the test + tester = fault.Tester(dut) + tester.poke(dut.vdd, 1) + tester.poke(dut.vss, 0) + + # in[0] gets inverted, in[1] gets buffered + # I want in[0] to be a 1kHz clock + # I want in[1] to be a 1kHz clock but delayed by 0.2 ms, so 0.2 cycles + tester.poke(dut.a[0], 1, delay=0.2e-3) + tester.poke(dut.a[1], 1, delay=0.3e-3) + tester.poke(dut.a[0], 0, delay=0.2e-3) + tester.poke(dut.a[1], 0, delay=0.3e-3) + tester.poke(dut.a[0], 1, delay=0.2e-3) + tester.poke(dut.a[1], 1, delay=0.3e-3) + tester.poke(dut.a[0], 0, delay=0.2e-3) + tester.poke(dut.a[1], 0, delay=0.3e-3) + tester.poke(dut.a[0], 1, delay=0.2e-3) + tester.poke(dut.a[1], 1, delay=0.3e-3) + + a = tester.get_value(dut.a[1], params={'style': 'phase', 'ref': dut.a[0]}) + b = tester.get_value(dut.a[1], params={'style': 'phase', 'ref': dut.b[0]}) + c = tester.get_value(dut.a[0], params={'style': 'phase', 'ref': dut.a[1]}) + + # set options + kwargs = dict( + target=target, + simulator=simulator, + model_paths=[Path('tests/spice/mybus.sp').resolve()], + vsup=vsup, + ) + if target == 'verilog-ams': + kwargs['use_spice'] = ['mybus'] + + # run the simulation + tester.compile_and_run(**kwargs) + + print('Look at measured phases') + print(a.value) + print(b.value) + print(c.value) + + assert abs(a.value - 0.2) < 1e-2 + assert abs(b.value - 0.7) < 1e-2 diff --git a/tests/test_get_value_domain_digital.py b/tests/test_get_value_domain_digital.py new file mode 100644 index 00000000..c1586984 --- /dev/null +++ b/tests/test_get_value_domain_digital.py @@ -0,0 +1,92 @@ +from pathlib import Path +import fault +import magma as m +from .common import pytest_sim_params + + +def pytest_generate_tests(metafunc): + # Not implemented for verilator right now + # The difficulty is that we have the action print out the simulation time + # Later we use that time to know where to look in the waveform dump + # But verilator doesn't have the same concept of simulation time + pytest_sim_params(metafunc, 'system-verilog') + + +class MyAdder(m.Circuit): + io = m.IO(a=m.In(m.UInt[4]), + b=m.Out(m.UInt[4])) + + io.b @= io.a + 1 + + +def test_get_value_digital(target, simulator): + # define test + tester = fault.Tester(MyAdder) + + # provide stimulus + stim = [2, 3, 2, 3, 2, 3] + output = [] + freq = 20e6 + for a in stim: + tester.poke(MyAdder.a, a, delay=(.5 / freq)) + tester.eval() + + output.append(tester.get_value(MyAdder.b, params={ + 'style': 'frequency', + 'height': 3.5 + })) + + # run the test + kwargs = dict( + target=target, + ) + if target == 'system-verilog': + # This is the only case right now, might do Verilator in the future + kwargs['simulator'] = simulator + kwargs['dump_waveforms'] = True + # tmp_dir seems to break this + # kwargs['tmp_dir'] = True + + tester.compile_and_run(**kwargs) + + # check the results + assert freq / 1.05 < output[0].value < freq * 1.05 + + +def test_real_val(target, simulator): + # define the circuit + class realadd(m.Circuit): + io = m.IO(a_val=fault.RealIn, b_val=fault.RealIn, c_val=fault.RealOut) + + # define test content + # output will toggle between 4 and 8 + stim = [1, 5, 1, 5, 1, 5] + tester = fault.Tester(realadd) + tester.poke(realadd.b_val, 3) + tester.eval() + + freq = 4e6 + for v in stim: + tester.poke(realadd.a_val, v, delay=1 / (2 * freq)) + # TODO this eval actually adds a 1ns delay, + # which breaks this test at higher frequencies + tester.eval() + + res = tester.get_value(realadd.c_val, params={ + 'style': 'frequency', + 'height': 6 + }) + + # run the test + tester.compile_and_run( + target=target, + simulator=simulator, + ext_libs=[Path('tests/verilog/realadd.sv').resolve()], + defines={f'__{simulator.upper()}__': None}, + ext_model_file=True, + dump_waveforms=True, + ) + + # check the results + print(freq / 1.05, res.value, freq * 1.05) + assert freq / 1.05 < res.value < freq * 1.05 diff --git a/tests/test_spice_bus.py b/tests/test_spice_bus.py index 671e0dd8..6807f445 100644 --- a/tests/test_spice_bus.py +++ b/tests/test_spice_bus.py @@ -26,21 +26,44 @@ class dut(m.Circuit): # step through all possible inputs tester.poke(dut.a, 0b000) + tester.delay(1e-6) tester.expect(dut.b, 0b101) tester.poke(dut.a, 0b001) + tester.delay(1e-6) tester.expect(dut.b, 0b100) tester.poke(dut.a, 0b010) + tester.delay(1e-6) tester.expect(dut.b, 0b111) tester.poke(dut.a, 0b011) + tester.delay(1e-6) tester.expect(dut.b, 0b110) + # test one bit of the bus at a time + tester.poke(dut.a[0], 0) + tester.delay(1e-6) + tester.expect(dut.b[0], 1) + tester.poke(dut.a[0], 1) + tester.delay(1e-6) + tester.expect(dut.b[0], 0) + tester.delay(1e-6) + tester.expect(dut.b[2], 1) + + tester.poke(dut.a[1], 0) + tester.delay(1e-6) + tester.expect(dut.b[1], 0) + tester.poke(dut.a[1], 1) + tester.delay(1e-6) + tester.expect(dut.b[1], 1) + tester.delay(1e-6) + tester.expect(dut.b[2], 1) + # set options kwargs = dict( target=target, simulator=simulator, model_paths=[Path('tests/spice/mybus.sp').resolve()], vsup=vsup, - tmp_dir=True + tmp_dir=False ) # run the simulation diff --git a/tests/test_spice_current.py b/tests/test_spice_current.py new file mode 100644 index 00000000..cd9859d6 --- /dev/null +++ b/tests/test_spice_current.py @@ -0,0 +1,69 @@ +import magma as m +import fault +from pathlib import Path +from .common import pytest_sim_params + + +def pytest_generate_tests(metafunc): + pytest_sim_params(metafunc, 'spice') + + +def test_get_value_analog(target, simulator): + # declare circuit + class mycurrenttest(m.Circuit): + io = m.IO( + in_c=fault.CurrentIn, + in_v=fault.RealIn, + out_c=fault.CurrentOut, + out_v=fault.RealOut + ) + + # wrap if needed + if target == 'verilog-ams': + dut = fault.VAMSWrap(mycurrenttest) + else: + dut = mycurrenttest + + # create the tester + tester = fault.Tester(dut) + + # define the test + stim = [(5.6, 7.8), (-6.5, -8.7)] + output = [] + for a, b in stim: + tester.poke(dut.in_c, a) + tester.poke(dut.in_v, b) + tester.delay(1e-6) + output.append((tester.get_value(dut.out_c), + tester.get_value(dut.out_v))) + + # set options + kwargs = dict( + target=target, + simulator=simulator, + tmp_dir=False + ) + spice_model = Path('tests/spice/mycurrenttest.sp').resolve() + if target == 'verilog-ams': + kwargs['model_paths'] = [spice_model] + kwargs['use_spice'] = ['mycurrenttest'] + elif target == 'spice': + kwargs['model_paths'] = [spice_model] + + # run the simulation + tester.compile_and_run(**kwargs) + + # check the results using Python assertions + def model(a, b): + return (b / 100, a*500) + + for (a, b), (c, d) in zip(stim, output): + for expected, read in zip(model(a, b), (c, d)): + lb = expected - 0.01 + ub = expected + 0.01 + print('Asserting', lb, read.value, ub) + assert lb <= read.value <= ub + + +if __name__ == '__main__': + test_get_value_analog('spice', 'ngspice')