* added cmd line script to read out and set flow control (red-y series by Voegtlin)

This commit is contained in:
Markus Rosenstihl 2016-02-18 17:09:02 +00:00
parent 6dc67edaaf
commit 50b6bf6622

210
src/tools/flow_control.py Executable file
View File

@ -0,0 +1,210 @@
#!/usr/bin/env python2
# -*- encoding: utf-8 -*-
import optparse
import os, sys
import serial
import numpy as N
import struct
import binascii
import time
DEBUG = False
"""
6.2.2 CRC Generation
====================
The Cyclical Redundancy Checking (CRC) field is two bytes, containing a 16bit binary value. The CRC value is calculated by the
transmitting device, which appends the CRC to the message. The device that receives recalculates a CRC during receipt of the
message, and compares the calculated value to the actual value it received in the CRC field. If the two values are not equal, an error
results.
The CRC is started by first preloading a 16bit register to all 1s. Then a process begins of applying successive 8bit bytes of the
message to the current contents of the register. Only the eight bits of data in each character are used for generating the CRC. Start
and stop bits and the parity bit, do not apply to the CRC.
During generation of the CRC, each 8bit character is exclusive ORed with the register contents. Then the result is shifted in the
direction of the least significant bit (LSB), with a zero filled into the most significant bit (MSB) position. The LSB is extracted and
examined. If the LSB was a 1, the register is then exclusive ORed with a preset, fixed value. If the LSB was a 0, no exclusive OR takes
place.
This process is repeated until eight shifts have been performed. After the last (eighth) shift, the next 8bit character is exclusive ORed
with the registers current value, and the process repeats for eight more shifts as described above. The final content of the register,
after all the characters of the message have been applied, is the CRC value.
A procedure for generating a CRC is:
1. Load a 16bit register with FFFF hex (all 1s). Call this the CRC register.
2. Exclusive OR the first 8bit byte of the message with the loworder byte of the 16bit CRC register, putting the result in the
CRC register.
3. Shift the CRC register one bit to the right (toward the LSB), zerofilling the MSB. Extract and examine the LSB.
4. (If the LSB was 0): Repeat Step 3 (another shift).
(If the LSB was 1): Exclusive OR the CRC register with the polynomial value 0xA001 (1010 0000 0000 0001).
5. Repeat Steps 3 and 4 until 8 shifts have been performed. When this is done, a complete 8bit byte will have been
processed.
6. Repeat Steps 2 through 5 for the next 8bit byte of the message. Continue doing this until all bytes have been processed.
7. The final content of the CRC register is the CRC value.
8. When the CRC is placed into the message, its upper and lower bytes must be swapped as described below.
Placing the CRC into the Message
When the 16bit CRC (two 8bit bytes) is transmitted in the message, the low-order byte will be transmitted first, followed by the high-
order byte.
"""
crc_test = "\x0d\x01\x00\x62\x00\x33"
crc_expected = 0xddd
def crc(message):
crc = 0xffff # step 1
for byte in message:
if type(byte) == type(str):
crc ^= ord(byte) # step 2: xor with lower byte of crc
else:
crc ^= byte # step 2: xor with lower byte of crc
for i in range(8): # step 5: repeat 2-4 until 8 shifts were performed
if (crc & 0x1) == 0: # step 4: check LSB
mask = 0x0
else:
mask = 0xa001
crc >>= 1 # step 3: shift one bit to the right
crc ^= mask
if DEBUG: print "Message:", [i for i in message], " CRC:", hex(crc)
return chr(crc & 0xff) + chr(
(crc >> 8) & 0xff) # return lower byte, upper byte (shift to the right, return last byte) i
class Flow:
def __init__(self, device="/dev/ttyUSB0"):
self.s = serial.Serial(port=device, timeout=0.02)
self.s.readline()
self.s.timeout = 0.2
self.address = bytearray([0x3])
self.rcommands = {
"flow": [bytearray([0x3, 0x0, 0x0, 0x0, 0x2]), ">f"],
"temperature": [bytearray([0x3, 0x0, 0x2, 0x0, 0x2]), ">f"],
"setflow": [bytearray([0x3, 0x0, 0x6, 0x0, 0x2]), ">f"],
"total": [bytearray([0x3, 0x0, 0x8, 0x0, 0x2]), ">f"],
"alarm": [bytearray([0x3, 0x0, 0xc, 0x0, 0x1]), ">H"],
"hwerror": [bytearray([0x3, 0x0, 0xd, 0x0, 0x1]), ">H"],
"unit_flow": [bytearray([0x3, 0x0, 0x16, 0x0, 0x4]), ">8s"],
"medium": [bytearray([0x3, 0x00, 0x1a, 0x0, 0x4]), ">8s"],
"device": [bytearray([0x3, 0x00, 0x23, 0x0, 0x4]), ">8s"],
"unit_total": [bytearray([0x3, 0x40, 0x48, 0x0, 0x4]), ">8s"],
}
self.wcommands = {
"flow": [bytearray([0x06, 0x0, 0x6]), ">f"]
}
def send_data(self, data_and_type):
data, type = data_and_type
if DEBUG: print "send", data_and_type, "data:", binascii.hexlify(data), "type:", type, self.address
msg = self.address + data
msg_crc = msg + crc(msg)
if DEBUG: print "send:", repr(str(msg_crc))
nbytes_sent = self.s.write(str(msg_crc))
if DEBUG: print "send:", self.s
if DEBUG: print "Sent bytes:", nbytes_sent
time.sleep(0.1) # this is necessary, otherwise no data can be received
return msg_crc
def recv_data(self):
"""
Reading Data
============
Function: 0x3
Format of the resulting string
Addr Func NoBytes Byte1...N CRCHi CRCLo
"""
message = bytearray(self.s.read(3))
if DEBUG: print "recv_data: header:", binascii.hexlify(message)
if message == "":
return
addr, func, nbytes = message
if func == 0x03:
data = self.s.read(nbytes)
if DEBUG: print "recv_data: data: %i %s" % (nbytes, binascii.hexlify(data))
message += data
crc_data = self.s.read(2)
if DEBUG: print "recv_data: crc", repr(str(crc_data))
if crc(message) != crc_data:
print "mismatch", crc(message), crc_data
return data
def get_var(self, named_type="flow"):
cmd,fmt = self.rcommands[named_type]
if DEBUG: print "get_var", cmd,fmt
self.send_data([cmd,fmt])
d = self.recv_data()
if d:
d = struct.unpack(fmt, d)[0]
return d
# msg = self.recv_data()
# print str(msg)
# float struct.unpack('>f', "4byte string")
# ushort struct.unpack('>H', "2byte string")
def current_flow(self):
# start register 0x0000
# 2 bytes
cmd = self.address + bytearray([0x3, 0x0, 0x0, 0x0, 0x2])
return cmd + crc(cmd)
def current_temperature(self):
# start register 0x0002
#
#
cmd = self.address + bytearray([0x3, 0x0, 0x2, 0x0, 0x2])
return cmd + crc(cmd)
def set_flow(self, flow):
"""
write multiple registers: 0x10
startHi
startLo
numregHi
numregLo
numbytes
"""
_flow = struct.pack(">f", flow)
# cmd = address + bytearray([0x10,0x0,0x6, 0x0, 0x2, 0x2]) + _flow
cmd1 = self.address + bytearray([0x06, 0x0, 0x6]) + _flow
self.s.write(cmd1 + crc(cmd1))
time.sleep(0.01)
# print self.s.readline()
# self.recv_data()
def cmd(self, data):
cmds = self.address + data
for i in cmds:
print hex(ord(i)),
return cmds + crc(cmds)
if __name__ == "__main__":
conf = os.path.expanduser('~/.flow')
if os.path.exists(conf):
dev = open(conf).readlines()[0].strip()
redy = Flow(device=dev)
print "Device: %6s" % (redy.get_var("device"))
print "Medium: %6s" % (redy.get_var("medium"))
print "Current Flow: %6.2f" % (redy.get_var("flow"))
print "Current Set Flow: %6.2f" % (redy.get_var("setflow"))
print "Flow Unit: %6s" % (redy.get_var("unit_flow"))
print "Total Gas: %6.2f" % (redy.get_var("total"))
print "Total Unit: %6s" % (redy.get_var("unit_total"))
print "Current Temperature: %6.2f C" % (redy.get_var("temperature"))
print "Alarm: %8i" % (redy.get_var("alarm"))
print "HW Error: %8i" % (redy.get_var("hwerror"))
if len(sys.argv) > 1:
try:
float(sys.argv[1])
except:
raise SyntaxError, "usage: %s <flow> # to set flow or empty to get current flow\nfirst line in ~/.flow defines serial tty, default: /dev/ttyUSB0" % (sys.argv[0])
print "Set flow: %6.2f"%(float(sys.argv[1]))
redy.set_flow(float(sys.argv[1]))