#!/usr/bin/python # -*- coding: utf-8 -*- import time import serial, sys import os from datetime import datetime from bitstring import BitArray import paho.mqtt.publish as publish serial_timeout=0.1 byte3=0 byte4=0 byte5=0 byte6=0 i=0 pvfaultdesc = [0]*8 pvfaultdesc[0]="load short-circuit protection" pvfaultdesc[1]="load overcurrent protection" pvfaultdesc[2]="battery low-voltage protection" pvfaultdesc[3]="reserved" pvfaultdesc[4]="solar panel array1 open-circuit breakdown(it means the controller does not detect the solar panel voltage within 24h)" pvfaultdesc[5]="solar panel array1 over-voltage protection" pvfaultdesc[6]="delayed protection phase for load overcurrent" pvfaultdesc[7]="shut the load compulsively" mqtt_broker = "mqtt.wh5" mains_need_change = 0 mains_on_ok = 0 fucker_reset_enabled = 0 #(0: enabled, 1: disabled) short_circuit_error = 0 mains_1_switchstate = 42 mains_2_switchstate = 42 def mains(port, state): global mains_1_switchstate global mains_2_switchstate if state == 'on': state == 1 elif state == 'off': state == 0 else: print("unknown state %s" % state) if port == 1: if mains_1_switchstate != state: publish.single("bikeport-1/relay/0/set", state , hostname=mqtt_broker) print("switched mains port 1 to %s" % state) mains_1_switchstate = state return if port == 2: if mains_2_switchstate != state: publish.single("bikeport-2/relay/0/set", state , hostname=mqtt_broker) print("switched mains port 2 to %s" % state) mains_2_switchstate = state return if port == 'all': mains(1, state) mains(2, state) return else: print("unknown mains port %s" % port) def get_mains_status(): print('') print('MAINS:') global mains_1_switchstate global mains_2_switchstate if mains_1_switchstate != 42: sys.stdout.write('Mains Switch 1:') sys.stdout.write('\t\t\t') sys.stdout.write(str(mains_1_switchstate)) sys.stdout.write('\n') if mains_2_switchstate != 42: sys.stdout.write('Mains Switch 2:') sys.stdout.write('\t\t\t') sys.stdout.write(str(mains_2_switchstate)) sys.stdout.write('\n\n') else: print("just started, no information about mains switch states yet") print('') def pvstatusbit(bit): if bit == '1': print("gesetzt :%s" % bit) else: print("nicht gesetzt :%s" % bit) def pvstatus(): # current status of system ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x61\x00\x00\x00\x00\xBD" global byte3 global byte4 del byte3 del byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 ): byte3=byte3.encode('hex') bitstring1=BitArray(hex=byte3) # sys.stdout.write('Inputstring:\t') # sys.stdout.write(bitstring1.bin) # sys.stdout.write('\n') pvfaultstate(bitstring1.bin) sys.stdout.write('\n') else : sys.stdout.write('Voltage communication error!') def pv_reset_lowvoltage(): print("Reset Unterspannungs Sicherung:") ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x51\x00\x00\x00\x00\x39" global byte3 global byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command ser.close() print("DONE") def pv_reset_shortcuircuit(): print("Reset Kurzschluss Sicherung:") ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x53\x00\x00\x00\x00\xAE" global byte3 global byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command ser.close() print("DONE") def pv_reset_overcurrent(): print("Reset Ueberlast Sicherung:") ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x52\x00\x00\x00\x00\x7D" global byte3 global byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command ser.close() print("DONE") #def reset_da_fucker(): # print('would reset failures now, but disabled due to further testing needed...') def reset_da_fucker(): if fucker_reset_enabled == 1: return global short_circuit_error print('here be dragons') cnt=0 sleep_after_reset=2 while cnt<=10: try: print('foo') now = datetime.now() print(now) pv_reset_overcurrent() time.sleep(0.5) pv_reset_lowvoltage() print("aktive Fehler nach reset:") pvstatus() print("erneuter reset nach %s Sekunden" % sleep_after_reset) time.sleep(sleep_after_reset) cnt = cnt+1 except KeyboardInterrupt: print("\n\nkilled via CTRL+C - Bye") sys.exit() short_circuit_error = 0 def pvfaultstate(bytearray): global pvfaultdesc global short_circuit_error # print("Inputstring Funktion:\t %s" %bytearray) i=0 arraylen=len(bytearray) # print (" Array ist %s Elemente lang" %(arraylen)) while i <=arraylen-1: if bytearray[i] =='1': print("Fehler aktiv: %s (Byte: %s)" % (pvfaultdesc[i], i) ) #pvstatusbit(bytearray[i]) if i<=arraylen: i=i+1 if bytearray[0] == '1': short_circuit_error = 1 # last anschalten def pvswitch(state): ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) if state == 0: command = "\x01\x55\x00\x00\x00\x00\x26" #last aus print ("demanding controller removing closing the load compulsively") else: command = "\x01\x54\x00\x00\x00\x00\xF5" #last an print ("demanding controller closing the loads compulsively") ser.write(command) ser.close() def get_PV_Ah(): ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x05\x00\x00\x00\x00\x9B" # the back transmission data from the controlleris the accumulative generating power data of solar panel array1(Ah, DC/DC output), and the data are the long data shaping with symbol, and the accumulative Ah scaling is 1Ah/LSB. ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) byte5=ser.read(1) byte6=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 and len(byte5) > 0 and len(byte6) >0 ): print ("PV: \t\t\t %s Ah" % (BitArray(hex=byte6.encode('hex'))+BitArray(hex=byte5.encode('hex'))+BitArray(hex=byte4.encode('hex'))+BitArray(hex=byte3.encode('hex'))).uint) else: print('communication error: get_PV_Ah') def get_load_Ah(): ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x16\x00\x00\x00\x00\xA3" # the back transmission data from the controlleris the accumulative generating power data of solar panel array1(Ah, DC/DC output), and the data are the long data shaping with symbol, and the accumulative Ah scaling is 1Ah/LSB. ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) byte5=ser.read(1) byte6=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 and len(byte5) > 0 and len(byte6) >0 ): print ("Load: \t\t\t %s Ah" % (BitArray(hex=byte6.encode('hex'))+BitArray(hex=byte5.encode('hex'))+BitArray(hex=byte4.encode('hex'))+BitArray(hex=byte3.encode('hex'))).uint) else: print('communication error: get_load_Ah') while True: try: now = datetime.now() os.system('clear') print(now) # pvswitch(1) # battery voltage ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x10\x00\x00\x00\x00\x2B" del byte3 del byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 ): value1=ord(byte3) value2=ord(byte4) voltage=(value2 * 256 + value1) / 100 voltage_modulo=(value2 * 256 + value1) % 100 if voltage_modulo <10: test_modulo=str(str('0')+ str(voltage_modulo)) else: test_modulo = voltage_modulo battery_voltage = float(str(voltage)+"."+str(test_modulo)) sys.stdout.write('BATTERY:\t\t') print(battery_voltage) publish.single("wh5/bikeport/pv/battery/voltage", battery_voltage, hostname="outpost.flashfingaz.de") # sys.stdout.write(str(voltage)) # sys.stdout.write('.') # sys.stdout.write(str(voltage_modulo)) # sys.stdout.write(' V\n') else : sys.stdout.write('Voltage communication error!') # pv voltage/current ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x01\x00\x00\x00\x00\x84" del byte3 del byte4 del byte5 del byte6 ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) byte5=ser.read(1) byte6=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 and len(byte5) > 0 and len(byte6) > 0 ): value1=ord(byte3) value2=ord(byte4) value3=ord(byte5) value4=ord(byte6) voltage=(value2 * 256 + value1) / 100 voltage_modulo=(value2 * 256 + value1) % 100 if voltage_modulo <10: test_modulo=str(str('0')+ str(voltage_modulo)) else: test_modulo = voltage_modulo voltage_test = float(str(voltage)+"."+str(test_modulo)) current=(value4 *256 + value3) / 100 current_modulo=(value4 * 256 + value3) % 100 if current_modulo <10: test_modulo=str(str('0')+ str(current_modulo)) else: test_modulo = current_modulo current_test = float(str(current)+"."+str(test_modulo)) sys.stdout.write('PV INPUT:\t\t') sys.stdout.write(str(voltage_test)) # sys.stdout.write(str(voltage)) # sys.stdout.write('.') # sys.stdout.write(str(voltage_modulo)) # sys.stdout.write(' V') sys.stdout.write('\t') sys.stdout.write(str(current_test)) # sys.stdout.write(str(current)) # sys.stdout.write('.') # sys.stdout.write(str(current_modulo)) sys.stdout.write(' A \n') publish.single("wh5/bikeport/pv/module/voltage", voltage_test, hostname="outpost.flashfingaz.de") publish.single("wh5/bikeport/pv/module/current", current_test, hostname="outpost.flashfingaz.de") else: print("pv load communication error") # current load ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x14\x00\x00\x00\x00\x34" del byte3 del byte4 ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 ): value1=ord(byte3) value2=ord(byte4) current=(value2 * 256 + value1) / 100 current_modulo=(value2 * 256 + value1) % 100 sys.stdout.write('LOAD:\t\t\t\t') sys.stdout.write(str(current)) sys.stdout.write('.') sys.stdout.write(str(current_modulo)) sys.stdout.write(' A\n') # temperature & battery capacity ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=serial_timeout) command = "\x01\x11\x00\x00\x00\x00\xF8" del byte3 del byte4 del byte5 del byte6 ser.flushInput() #clear buffer ser.write(command) #send prepared command skip=ser.read(2) #skip first two bytes byte3=ser.read(1) byte4=ser.read(1) byte5=ser.read(1) byte6=ser.read(1) ser.close() if ( len(byte3) > 0 and len(byte4) > 0 and len(byte5) > 0 and len(byte6) > 0 ): value1=ord(byte3) value2=ord(byte4) value3=ord(byte5) value4=ord(byte6) temperature=(value2 * 256 + value1) capacity=(value4 * 256 + value3) sys.stdout.write('Temperature:\t\t\t\t') sys.stdout.write(str(temperature)) sys.stdout.write(' °C\n') sys.stdout.write('Battery Capacity:\t\t\t') sys.stdout.write(str(capacity)) sys.stdout.write(' %\n') print("\n\naktive Fehler:") pvstatus() get_PV_Ah() get_load_Ah() get_mains_status() if short_circuit_error == 1: print("short circuit protection active, probably in error, trying to reset...") reset_da_fucker() # sleep 10 seconds before starting over if capacity < 60: mains_need_change = mains_need_change + 1 print ('batter capacity low!') print("Cycles with low Battery Capacity: %s" % mains_need_change) if mains_need_change >= 3: print("battery capacity low, switching off all mains ports") battery_state = 'discharged' mains("all", "off") mains_need_change = 0 elif battery_voltage >14: print("battery capacity ok") mains_on_ok = mains_on_ok + 1 print("Cycles with high Battery Capacity: %s" % mains_on_ok) if mains_on_ok >= 30: mains('all', 'on') battery_state = 'charged' mains_on_ok = 0 else: print('no need to switch mains right now...') time.sleep(10) except KeyboardInterrupt: print("\n\nkilled via CTRL+C - Bye") #pvswitch(0) #last ausschalten sys.exit()