Article Index

 

Page 229

import subprocess
import io


def load1w(pin):
    indicator = "w1-gpio"
    command = ["sudo", "dtoverlay", "w1-gpio", "gpiopin="+str(pin)]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


def getDevices():
    fdr = io.open(
        "/sys/bus/w1/drivers/w1_master_driver/w1_bus_master1/w1_master_slaves", "r")
    buffer = fdr.read()
    fdr.close()
    return buffer.split()


def getData(dev, name):
    fdr = io.open("/sys/bus/w1/devices/"+dev+"/"+name, "r")
    data = fdr.read()
    fdr.close()
    return data


load1w(4)
devs = getDevices()
name = getData(devs[0], "name")
print("Name ", name)
resolution = getData(devs[0], "resolution")
print("Resolution ", resolution)
w1_slave = getData(devs[0], "w1_slave")
print("w1_slave ", w1_slave)
temperature = getData(devs[0], "temperature")
print("temperature ", temperature)
temperature = int(temperature) / 1000
print("temperature ", temperature, "C")
alarms = getData(devs[0], "alarms")
print("Alarms ", alarms)

Page 240

from ctypes import *
from socket import *
from struct import *
import os
import subprocess
import io
from time import sleep


NLMSG_DONE = 0x3

CN_W1_IDX = 0x3
CN_W1_VAL = 0x1

W1_SLAVE_ADD = 0
W1_SLAVE_REMOVE = 1
W1_MASTER_ADD = 2
W1_MASTER_REMOVE = 3
W1_MASTER_CMD = 4
W1_SLAVE_CMD = 5
W1_LIST_MASTERS = 6

W1_CMD_READ = 0
W1_CMD_WRITE = 1
W1_CMD_SEARCH = 2
W1_CMD_ALARM_SEARCH = 3
W1_CMD_TOUCH = 4
W1_CMD_RESET = 5
W1_CMD_SLAVE_ADD = 6
W1_CMD_SLAVE_REMOVE = 7
W1_CMD_LIST_SLAVES = 8
W1_CMD_MAX = 9

NETLINK_CONNECTOR = 11

nl_seq = 0


class nlmsghdr(Structure):
    _fields_ = [("nlmsg_len", c_uint32),
                ("nlmsg_type", c_uint16),
                ("nlmsg_flags", c_uint16),
                ("nlmsg_seq", c_uint32),
                ("nlmsg_pid", c_uint32)
                ]


cnh = nlmsghdr()
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cnh.nlmsg_pid = c_uint32(os.getpid())
cnh.nlmsg_type = c_uint16(NLMSG_DONE)
cnh.nlmsg_flags = c_uint16(0)


class cn_msg(Structure):
    _fields_ = [("idx", c_uint32),
                ("val", c_uint32),
                ("seq", c_uint32),
                ("ack", c_uint32),
                ("len", c_uint16),
                ("flags", c_uint16),
                ]


cmsg = cn_msg()
cmsg.idx = c_uint32(CN_W1_IDX)
cmsg.val = c_uint32(CN_W1_VAL)
cmsg.seq = c_uint32(cnh.nlmsg_seq)
cmsg.ack = c_uint32(0)


class w1_netlink_msg(Structure):
    _fields_ = [("type", c_uint8),
                ("status", c_uint8),
                ("len", c_uint16),
                ("id", c_uint8*8),
                ]


msg = w1_netlink_msg()
msg.type = c_uint8(W1_MASTER_CMD)
msg.id = (c_uint8*8).from_buffer_copy(c_uint64(1))


class w1_netlink_cmd(Structure):
    _fields_ = [("cmd", c_uint8),
                ("res", c_uint8),
                ("len", c_uint16),
                ]


w1_cmd = w1_netlink_cmd()
w1_cmd.cmd = c_uint8(W1_CMD_SEARCH)
w1_cmd.len = c_uint16(0)


msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)


s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR)
s.connect((0, AF_NETLINK))

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("Bytes received", len(buffer2), flush=True)

p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)

num = int(w1_cmdr.len/8)
print("number of slaves", num)

p = p+len(bytes(w1_cmdr))
slaves = []
for i in range(num):
    slaves.append(c_uint64.from_buffer_copy(buffer2, p+i*8).value)
print([hex(slave) for slave in slaves])

# GET ACK
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

# READ TEMPERATURE
# SET UP SLAVE COMMAND
print()
print("Read Temperature")

msg.type = W1_SLAVE_CMD

id = slaves[0].to_bytes(8, "little")
msg.id = (c_uint8*8).from_buffer_copy(id)


# SETUP AND SEND WRITE 0x44 = CONVERT
w1_cmd.cmd = c_uint8(W1_CMD_WRITE)
w1_cmd_data = c_uint8(0x44)


cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cmsg.seq = c_uint32(cnh.nlmsg_seq)

w1_cmd.len = c_uint16(1)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg) + \
    bytearray(w1_cmd)+bytearray(w1_cmd_data)

n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
sleep(1)

# READ SCRATCH PAD
cnh.nlmsg_seq = nl_seq
nl_seq += 1
cmsg.seq = cnh.nlmsg_seq

w1_cmd.cmd = W1_CMD_WRITE
w1_cmd_data = c_uint8(0xBE)

w1_cmd2 = w1_netlink_cmd()
w1_cmd2.cmd = c_uint8(W1_CMD_READ)


w1_cmd.len = c_uint16(1)
w1_cmd2.len = c_uint16(9)
msg.len = c_uint16(len(bytearray(w1_cmd)) +
                   len(bytearray(w1_cmd2)) + w1_cmd.len+w1_cmd2.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

w1_cmd2_data = (c_uint8*9)(0)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd) + \
    bytearray(w1_cmd_data)+bytearray(w1_cmd2)+bytearray(w1_cmd2_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)

buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

buffer2 = s.recv(65535)
print("length of data received ", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
# print(cn_cnhr.nlmsg_len)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
p = p+len(bytes(w1_cmdr))

temp = c_uint16.from_buffer_copy(buffer2, p)

print("number of bytes found ", w1_cmdr.len, flush=True)
print(temp.value/16)