ut803-plot_marenz_github/es51922.py

379 lines
11 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Utility for parsing data from multimeters based on Cyrustek ES51922 chipset.
Written using as much information from the datasheet as possible
(some functionality is not documented).
The utility should output only sensible measurements and checks if
the data packet is valid (there is no check sum in the data packet).
Tested with UNI-T UT61E multimeter.
All the functionality of UNI-T UT61E seems to work fine.
Not tested: temperature and ADP modes.
Licenced LGPL2+
Copyright
(C) 2013 Domas Jokubauskis (domas@jokubauskis.lt)
(C) 2014 Philipp Klaus (philipp.l.klaus@web.de)
Some information was used from dmmut61e utility by Steffen Vogel
"""
from __future__ import print_function
import sys
from decimal import Decimal
import struct
import logging
import datetime
def test_bit(int_type, offset):
"""
testBit() returns True if the bit at 'offset' is one.
From http://wiki.python.org/moin/BitManipulation
"""
mask = 1 << offset
return bool(int_type & mask)
def get_bits(int_type, template):
"""
Extracts 'named bits' from int_type.
Naming the bits works by supplying a list of
bit names (or fixed bits as 0/1) via template.
"""
bits = {}
for i in range(4):
bit = test_bit(int_type, i)
bit_name = template[3-i]
#print(bit, bit_name, i)
if bit_name in (0,1) and bit==bit_name:
continue
elif bit_name in (0,1):
raise ValueError
else:
bits[bit_name] = bit
return bits
"""
The entries in the following RANGE dictionaries have the following structure:
(value_multiplier, dp_digit_position, display_unit)
value_multiplier: Multiply the displayed value by this factor to get the value in base units.
dp_digit_position: The digit position of the decimal point in the displayed meter reading value.
display_unit: The unit the displayed value is shown in.
"""
RANGE_DIODE = [
(1e0, 3, "V"), #6.000V
]
RANGE_FREQUENCY = [
(1e0, 0, "Hz"), #6000Hz
(1e3, 2, "kHz"), #60.00kHz
(1e3, 1, "kHz"), #600.0kHz
(1e6, 3, "MHz"), #6.000MHz
(1e6, 2, "MHz"), #60.00MHz
]
RANGE_RESISTANCE = [
(1e0, 1, "Ω"), #600.0Ω
(1e3, 3, ""), #6.000KΩ
(1e3, 2, ""), #60.00KΩ
(1e3, 1, ""), #600.0KΩ
(1e6, 3, ""), #6.000MΩ
(1e6, 2, ""), #60.00MΩ
]
RANGE_CONTINUITY = [
(1e0, 1, "Ω"), #600.0Ω
]
RANGE_CAPACITANCE = [
(1e-9, 3, "nF"), #6.000nF
(1e-9, 2, "nF"), #60.00nF
(1e-9, 1, "nF"), #600.0nF
(1e-6, 3, "µF"), #6.000μF
(1e-6, 2, "µF"), #60.00μF
(1e-6, 1, "µF"), #600.0μF
(1e-3, 3, "mF"), #6.000mF
]
RANGE_CURRENT_10A = [
(1e0, 2, "A") #10.00A
]
RANGE_VOLTAGE = [
(1e0, 3, "V"), #6.000V
(1e0, 2, "V"), #60.00V
(1e0, 1, "V"), #600.0V
(1e0, 0, "V"), #1000V
(1e-1, 1,"mV"), #600.0mV
]
RANGE_CURRENT_AUTO_UA = [
(1e-6, 1, "µA"), #
(1e-6, 0, "µA"), #2
]
RANGE_HFE = [
(1e0, 0, "hFe"),
]
RANGE_CURRENT_AUTO_MA = [
(1e-3, 2, "mA"), #
(1e-3, 1, "mA"), #2
]
FUNCTION = {
# (function, subfunction, unit)
0x01: ("diode", RANGE_DIODE, "V"),
0x02: ("frequency", RANGE_FREQUENCY, "Hz"),
0x03: ("resistance", RANGE_RESISTANCE, "Ω"),
0x04: ("temperature", None, "deg"),
0x05: ("continuity", RANGE_CONTINUITY, "Ω"),
0x06: ("capacitance", RANGE_CAPACITANCE, "F"),
0x09: ("current", RANGE_CURRENT_10A, "A"), #10 A current
0x0b: ("voltage", RANGE_VOLTAGE, "V"),
0x0d: ("current", RANGE_CURRENT_AUTO_UA, "A"),
0x0e: ("current gain", RANGE_HFE, ""),
0x0f: ("current", RANGE_CURRENT_AUTO_MA, "A"),
}
STATUS = [
"JUDGE", # 1-°C, 0-°F.
"SIGN", # 1-minus sign, 0-no sign
"BATT", # 1-battery low
"OL", # input overflow
]
OPTION1 = [
"HOLD",
"MAX", # maximum
"MIN", # minimum
0,
]
OPTION2 = [
"DC",
"AC",
"AUTO",
0,
]
def parse(packet):
"""
The most important function of this module:
Parses 9-byte-long packets from the UT803 DMM and returns
a dictionary with all information extracted from the packet.
"""
d_range, \
d_digit0, d_digit1, d_digit2, d_digit3, \
d_function, d_status, \
d_option1, d_option2 = struct.unpack("B"*9, packet)
options = {}
d_options = (d_status, d_option1, d_option2)
OPTIONS = (STATUS, OPTION1, OPTION2)
for d_option, OPTION in zip(d_options, OPTIONS):
bits = get_bits(d_option, OPTION)
options.update(bits)
function = FUNCTION[d_function & 0x0f]
mode = function[0]
m_range = function[1][d_range & 0x0f]
unit = function[2]
if mode == "frequency" and options["JUDGE"]:
mode = "duty_cycle"
unit = "%"
m_range = (1e0, 1, "%") #2200.0°C
current = None
if options["AC"] and options["DC"]:
raise ValueError
elif options["DC"]:
current = "DC"
elif options["AC"]:
current = "AC"
operation = "normal"
if options["OL"]:
operation = "overload"
if options["AUTO"]:
mrange = "auto"
else:
mrange = "manual"
if options["BATT"]:
battery_low = True
else:
battery_low = False
# data hold mode, received value is actual!
if options["HOLD"]:
hold = True
else:
hold = False
peak = None
if options["MAX"]:
peak = "max"
elif options["MIN"]:
peak = "min"
digits = [d_digit0, d_digit1, d_digit2, d_digit3]
digits = [digit & 0x0f for digit in digits]
display_value = 0
for i, digit in zip(range(4), digits):
display_value += digit*(10**(3-i))
if options["SIGN"]: display_value = -display_value
display_value = Decimal(display_value) / 10**m_range[1]
display_unit = m_range[2]
value = float(display_value) * m_range[0]
if operation != "normal":
display_value = ""
value = ""
results = {
'value' : value,
'unit' : unit,
'display_value' : display_value,
'display_unit' : display_unit,
'mode' : mode,
'current' : current,
'peak' : peak,
'hold' : hold,
'range' : mrange,
'operation' : operation,
'battery_low' : battery_low
}
return results
def output_readable(results):
operation = results["operation"]
battery_low = results["battery_low"]
if operation == "normal":
display_value = results["display_value"]
display_unit = results["display_unit"]
line = "{value} {unit}".format(value=display_value, unit=display_unit)
else:
line = "-, the measurement is {operation}ed!".format(operation=operation)
if battery_low:
line.append(" Battery low!")
return line
def format_field(results, field_name):
"""
Helper function for output formatting.
"""
value = results[field_name]
if field_name == "value":
if results["operation"]=="normal":
return str(value)
else:
return ""
if value==None:
return ""
elif value==True:
return "1"
elif value==False:
return "0"
else:
return str(value)
CSV_FIELDS = ["value", "unit", "mode", "current", "operation", "peak",
"battery_low", "hold"]
def output_csv(results):
"""
Helper function to write output lines to a CSV file.
"""
field_data = [format_field(results, field_name) for field_name in CSV_FIELDS]
line = ";".join(field_data)
return line
def main():
"""
Main function: Entry point if running this module from the command line.
Reads lines from stdin and parses them as ES51922 messages.
Prints to stdout and to a CSV file.
"""
import argparse
parser = argparse.ArgumentParser(description='Utility for parsing data from multimeters based on Cyrustek ES51922 chipset.')
parser.add_argument('-m', '--mode', choices=['csv', 'plot', 'readable'],
default="csv",
help='output mode (default: csv)')
parser.add_argument('-f', '--file',
help='output file')
parser.add_argument('--verbose', action='store_true',
help='enable verbose output')
args = parser.parse_args()
if args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logging.basicConfig(format='%(levelname)s:%(message)s', level=log_level)
output_file = None
if args.mode == 'csv':
timestamp = datetime.datetime.now()
date_format = "%Y-%m-%d_%H:%S"
timestamp = timestamp.strftime(date_format)
if args.file:
file_name = args.file
else:
file_name = "measurement_{}.csv".format(timestamp)
output_file = open(file_name, "w")
logging.info('Writing to file "{}"'.format(file_name))
header = "timestamp;{}\n".format(";".join(CSV_FIELDS))
output_file.write(header)
elif args.mode == 'plot':
if args.file:
file_name = args.file
else:
logging.error('No file name specified')
while True:
line = sys.stdin.readline()
if not line: break
line = line.strip()
try:
line = line.encode('ascii')
except:
logging.warning('Not an ASCII input line, ignoring: "{}"'.format(line))
continue
timestamp = datetime.datetime.now()
timestamp = timestamp.isoformat(sep=' ')
if len(line)==9:
try:
results = parse(line)
except Exception as e:
logging.warning('Error "{}" packet from multimeter: "{}"'.format(e, line))
if args.mode == 'csv':
line = output_csv(results)
output_file.write("{};{}\n".format(timestamp, line))
elif args.mode == 'readable':
line = output_readable(results)
print(timestamp.split(" ")[1], line)
elif args.mode == 'plot':
ost = results['mode'] + ': '
if results['operation'] != 'normal':
ost += 'overload'
else:
ost += str(results['value']) + results['unit']
output_file = open(file_name, 'a')
output_file.write(ost + '\n')
output_file.close()
print(ost)
else:
raise NotImplementedError
elif line:
logging.warning('Unknown packet from multimeter: "{}", length: {}'.format(line, len(line)))
else:
logging.warning('Not a response from the multimeter: ""'.format(line))
if __name__ == "__main__":
main()