379 lines
11 KiB
Python
Executable File
379 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Utility for parsing data from multimeters based on Cyrustek ES51922 chipset.
|
|
|
|
Written using as much information from the datasheet as possible
|
|
(some functionality is not documented).
|
|
The utility should output only sensible measurements and checks if
|
|
the data packet is valid (there is no check sum in the data packet).
|
|
|
|
Tested with UNI-T UT61E multimeter.
|
|
All the functionality of UNI-T UT61E seems to work fine.
|
|
Not tested: temperature and ADP modes.
|
|
|
|
Licenced LGPL2+
|
|
Copyright
|
|
(C) 2013 Domas Jokubauskis (domas@jokubauskis.lt)
|
|
(C) 2014 Philipp Klaus (philipp.l.klaus@web.de)
|
|
|
|
Some information was used from dmmut61e utility by Steffen Vogel
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
import sys
|
|
from decimal import Decimal
|
|
import struct
|
|
import logging
|
|
import datetime
|
|
|
|
def test_bit(int_type, offset):
|
|
"""
|
|
testBit() returns True if the bit at 'offset' is one.
|
|
From http://wiki.python.org/moin/BitManipulation
|
|
"""
|
|
mask = 1 << offset
|
|
return bool(int_type & mask)
|
|
|
|
def get_bits(int_type, template):
|
|
"""
|
|
Extracts 'named bits' from int_type.
|
|
Naming the bits works by supplying a list of
|
|
bit names (or fixed bits as 0/1) via template.
|
|
"""
|
|
bits = {}
|
|
for i in range(4):
|
|
bit = test_bit(int_type, i)
|
|
bit_name = template[3-i]
|
|
#print(bit, bit_name, i)
|
|
if bit_name in (0,1) and bit==bit_name:
|
|
continue
|
|
elif bit_name in (0,1):
|
|
raise ValueError
|
|
else:
|
|
bits[bit_name] = bit
|
|
return bits
|
|
|
|
"""
|
|
The entries in the following RANGE dictionaries have the following structure:
|
|
|
|
(value_multiplier, dp_digit_position, display_unit)
|
|
|
|
value_multiplier: Multiply the displayed value by this factor to get the value in base units.
|
|
dp_digit_position: The digit position of the decimal point in the displayed meter reading value.
|
|
display_unit: The unit the displayed value is shown in.
|
|
"""
|
|
RANGE_DIODE = [
|
|
(1e0, 3, "V"), #6.000V
|
|
]
|
|
|
|
RANGE_FREQUENCY = [
|
|
(1e0, 0, "Hz"), #6000Hz
|
|
(1e3, 2, "kHz"), #60.00kHz
|
|
(1e3, 1, "kHz"), #600.0kHz
|
|
(1e6, 3, "MHz"), #6.000MHz
|
|
(1e6, 2, "MHz"), #60.00MHz
|
|
]
|
|
|
|
RANGE_RESISTANCE = [
|
|
(1e0, 1, "Ω"), #600.0Ω
|
|
(1e3, 3, "kΩ"), #6.000KΩ
|
|
(1e3, 2, "kΩ"), #60.00KΩ
|
|
(1e3, 1, "kΩ"), #600.0KΩ
|
|
(1e6, 3, "MΩ"), #6.000MΩ
|
|
(1e6, 2, "MΩ"), #60.00MΩ
|
|
]
|
|
|
|
RANGE_CONTINUITY = [
|
|
(1e0, 1, "Ω"), #600.0Ω
|
|
]
|
|
|
|
RANGE_CAPACITANCE = [
|
|
(1e-9, 3, "nF"), #6.000nF
|
|
(1e-9, 2, "nF"), #60.00nF
|
|
(1e-9, 1, "nF"), #600.0nF
|
|
(1e-6, 3, "µF"), #6.000μF
|
|
(1e-6, 2, "µF"), #60.00μF
|
|
(1e-6, 1, "µF"), #600.0μF
|
|
(1e-3, 3, "mF"), #6.000mF
|
|
]
|
|
|
|
RANGE_CURRENT_10A = [
|
|
(1e0, 2, "A") #10.00A
|
|
]
|
|
|
|
RANGE_VOLTAGE = [
|
|
(1e0, 3, "V"), #6.000V
|
|
(1e0, 2, "V"), #60.00V
|
|
(1e0, 1, "V"), #600.0V
|
|
(1e0, 0, "V"), #1000V
|
|
(1e-1, 1,"mV"), #600.0mV
|
|
]
|
|
|
|
RANGE_CURRENT_AUTO_UA = [
|
|
(1e-6, 1, "µA"), #
|
|
(1e-6, 0, "µA"), #2
|
|
]
|
|
|
|
RANGE_HFE = [
|
|
(1e0, 0, "hFe"),
|
|
]
|
|
|
|
RANGE_CURRENT_AUTO_MA = [
|
|
(1e-3, 2, "mA"), #
|
|
(1e-3, 1, "mA"), #2
|
|
]
|
|
|
|
FUNCTION = {
|
|
# (function, subfunction, unit)
|
|
0x01: ("diode", RANGE_DIODE, "V"),
|
|
0x02: ("frequency", RANGE_FREQUENCY, "Hz"),
|
|
0x03: ("resistance", RANGE_RESISTANCE, "Ω"),
|
|
0x04: ("temperature", None, "deg"),
|
|
0x05: ("continuity", RANGE_CONTINUITY, "Ω"),
|
|
0x06: ("capacitance", RANGE_CAPACITANCE, "F"),
|
|
0x09: ("current", RANGE_CURRENT_10A, "A"), #10 A current
|
|
0x0b: ("voltage", RANGE_VOLTAGE, "V"),
|
|
0x0d: ("current", RANGE_CURRENT_AUTO_UA, "A"),
|
|
0x0e: ("current gain", RANGE_HFE, ""),
|
|
0x0f: ("current", RANGE_CURRENT_AUTO_MA, "A"),
|
|
}
|
|
|
|
STATUS = [
|
|
"JUDGE", # 1-°C, 0-°F.
|
|
"SIGN", # 1-minus sign, 0-no sign
|
|
"BATT", # 1-battery low
|
|
"OL", # input overflow
|
|
]
|
|
|
|
OPTION1 = [
|
|
"HOLD",
|
|
"MAX", # maximum
|
|
"MIN", # minimum
|
|
0,
|
|
]
|
|
|
|
OPTION2 = [
|
|
"DC",
|
|
"AC",
|
|
"AUTO",
|
|
0,
|
|
]
|
|
|
|
def parse(packet):
|
|
"""
|
|
The most important function of this module:
|
|
Parses 9-byte-long packets from the UT803 DMM and returns
|
|
a dictionary with all information extracted from the packet.
|
|
"""
|
|
d_range, \
|
|
d_digit0, d_digit1, d_digit2, d_digit3, \
|
|
d_function, d_status, \
|
|
d_option1, d_option2 = struct.unpack("B"*9, packet)
|
|
|
|
options = {}
|
|
d_options = (d_status, d_option1, d_option2)
|
|
OPTIONS = (STATUS, OPTION1, OPTION2)
|
|
for d_option, OPTION in zip(d_options, OPTIONS):
|
|
bits = get_bits(d_option, OPTION)
|
|
options.update(bits)
|
|
|
|
function = FUNCTION[d_function & 0x0f]
|
|
|
|
mode = function[0]
|
|
m_range = function[1][d_range & 0x0f]
|
|
unit = function[2]
|
|
if mode == "frequency" and options["JUDGE"]:
|
|
mode = "duty_cycle"
|
|
unit = "%"
|
|
m_range = (1e0, 1, "%") #2200.0°C
|
|
|
|
current = None
|
|
if options["AC"] and options["DC"]:
|
|
raise ValueError
|
|
elif options["DC"]:
|
|
current = "DC"
|
|
elif options["AC"]:
|
|
current = "AC"
|
|
|
|
operation = "normal"
|
|
if options["OL"]:
|
|
operation = "overload"
|
|
|
|
if options["AUTO"]:
|
|
mrange = "auto"
|
|
else:
|
|
mrange = "manual"
|
|
|
|
if options["BATT"]:
|
|
battery_low = True
|
|
else:
|
|
battery_low = False
|
|
|
|
# data hold mode, received value is actual!
|
|
if options["HOLD"]:
|
|
hold = True
|
|
else:
|
|
hold = False
|
|
|
|
peak = None
|
|
if options["MAX"]:
|
|
peak = "max"
|
|
elif options["MIN"]:
|
|
peak = "min"
|
|
|
|
digits = [d_digit0, d_digit1, d_digit2, d_digit3]
|
|
digits = [digit & 0x0f for digit in digits]
|
|
|
|
display_value = 0
|
|
for i, digit in zip(range(4), digits):
|
|
display_value += digit*(10**(3-i))
|
|
if options["SIGN"]: display_value = -display_value
|
|
display_value = Decimal(display_value) / 10**m_range[1]
|
|
display_unit = m_range[2]
|
|
value = float(display_value) * m_range[0]
|
|
|
|
if operation != "normal":
|
|
display_value = ""
|
|
value = ""
|
|
results = {
|
|
'value' : value,
|
|
'unit' : unit,
|
|
'display_value' : display_value,
|
|
'display_unit' : display_unit,
|
|
'mode' : mode,
|
|
'current' : current,
|
|
'peak' : peak,
|
|
'hold' : hold,
|
|
'range' : mrange,
|
|
'operation' : operation,
|
|
'battery_low' : battery_low
|
|
}
|
|
|
|
return results
|
|
|
|
def output_readable(results):
|
|
operation = results["operation"]
|
|
battery_low = results["battery_low"]
|
|
if operation == "normal":
|
|
display_value = results["display_value"]
|
|
display_unit = results["display_unit"]
|
|
line = "{value} {unit}".format(value=display_value, unit=display_unit)
|
|
else:
|
|
line = "-, the measurement is {operation}ed!".format(operation=operation)
|
|
if battery_low:
|
|
line.append(" Battery low!")
|
|
return line
|
|
|
|
def format_field(results, field_name):
|
|
"""
|
|
Helper function for output formatting.
|
|
"""
|
|
value = results[field_name]
|
|
if field_name == "value":
|
|
if results["operation"]=="normal":
|
|
return str(value)
|
|
else:
|
|
return ""
|
|
if value==None:
|
|
return ""
|
|
elif value==True:
|
|
return "1"
|
|
elif value==False:
|
|
return "0"
|
|
else:
|
|
return str(value)
|
|
|
|
CSV_FIELDS = ["value", "unit", "mode", "current", "operation", "peak",
|
|
"battery_low", "hold"]
|
|
def output_csv(results):
|
|
"""
|
|
Helper function to write output lines to a CSV file.
|
|
"""
|
|
field_data = [format_field(results, field_name) for field_name in CSV_FIELDS]
|
|
line = ";".join(field_data)
|
|
return line
|
|
|
|
def main():
|
|
"""
|
|
Main function: Entry point if running this module from the command line.
|
|
Reads lines from stdin and parses them as ES51922 messages.
|
|
Prints to stdout and to a CSV file.
|
|
"""
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Utility for parsing data from multimeters based on Cyrustek ES51922 chipset.')
|
|
parser.add_argument('-m', '--mode', choices=['csv', 'plot', 'readable'],
|
|
default="csv",
|
|
help='output mode (default: csv)')
|
|
parser.add_argument('-f', '--file',
|
|
help='output file')
|
|
parser.add_argument('--verbose', action='store_true',
|
|
help='enable verbose output')
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_level = logging.INFO
|
|
logging.basicConfig(format='%(levelname)s:%(message)s', level=log_level)
|
|
|
|
output_file = None
|
|
if args.mode == 'csv':
|
|
timestamp = datetime.datetime.now()
|
|
date_format = "%Y-%m-%d_%H:%S"
|
|
timestamp = timestamp.strftime(date_format)
|
|
if args.file:
|
|
file_name = args.file
|
|
else:
|
|
file_name = "measurement_{}.csv".format(timestamp)
|
|
output_file = open(file_name, "w")
|
|
logging.info('Writing to file "{}"'.format(file_name))
|
|
header = "timestamp;{}\n".format(";".join(CSV_FIELDS))
|
|
output_file.write(header)
|
|
elif args.mode == 'plot':
|
|
if args.file:
|
|
file_name = args.file
|
|
else:
|
|
logging.error('No file name specified')
|
|
while True:
|
|
line = sys.stdin.readline()
|
|
if not line: break
|
|
line = line.strip()
|
|
try:
|
|
line = line.encode('ascii')
|
|
except:
|
|
logging.warning('Not an ASCII input line, ignoring: "{}"'.format(line))
|
|
continue
|
|
timestamp = datetime.datetime.now()
|
|
timestamp = timestamp.isoformat(sep=' ')
|
|
if len(line)==9:
|
|
try:
|
|
results = parse(line)
|
|
except Exception as e:
|
|
logging.warning('Error "{}" packet from multimeter: "{}"'.format(e, line))
|
|
if args.mode == 'csv':
|
|
line = output_csv(results)
|
|
output_file.write("{};{}\n".format(timestamp, line))
|
|
elif args.mode == 'readable':
|
|
line = output_readable(results)
|
|
print(timestamp.split(" ")[1], line)
|
|
elif args.mode == 'plot':
|
|
ost = results['mode'] + ': '
|
|
if results['operation'] != 'normal':
|
|
ost += 'overload'
|
|
else:
|
|
ost += str(results['value']) + results['unit']
|
|
output_file = open(file_name, 'a')
|
|
output_file.write(ost + '\n')
|
|
output_file.close()
|
|
print(ost)
|
|
else:
|
|
raise NotImplementedError
|
|
elif line:
|
|
logging.warning('Unknown packet from multimeter: "{}", length: {}'.format(line, len(line)))
|
|
else:
|
|
logging.warning('Not a response from the multimeter: ""'.format(line))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|