ProgramsGPIO Zero Second Edition

GPIOZero2E360

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a VS Code project. 

 

Note: The VS Code task listings are at the very end of this page.

The only downside is that you have to create a project to paste the code into.

To do this follow the instructionin the book- in particular remember to add the bcm2835 library and also any library references that may be needed. 

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

 


Page 27

 
from gpiozero import Device
Device()
print(Device.pin_factory.board_info.model)

 


Page 32

from gpiozero import LED
from time import sleep

led = LED(4)

while True:
    led.on()
    sleep(1)
    led.off()
    sleep(1)

 

Page 52

from gpiozero import LED
from signal import pause

led = LED(4)
led.blink(on_time=1,off_time=1,n=100)
print("Program Complete")
pause()
 

Page 52

from gpiozero import LED
led = LED(4)
led.blink(on_time=1,off_time=1,n=100,background=False)
print("Program Complete")
 

Page 58

from gpiozero import LED
from time import sleep

led = LED(4)

while True:
    led.toggle()
    sleep(1)
    led.toggle()
    sleep(1)
 

Page 59

from gpiozero import Buzzer
from signal import pause

buzz = Buzzer(4)
buzz.beep(on_time=1,off_time=1,n=100)
print("Program Complete")
pause()
 

Page 61

from gpiozero import DigitalOutputDevice
class Lock(DigitalOutputDevice):
  def __init__(self,*args,**kwargs):
      if 'active_high' in kwargs:
          raise TypeError("active_high not supported")
      super().__init__(*args,**kwargs)            
  def lock(self):
      super().on()
  def unlock(self):
      super().off()
  def on(self):
      raise AttributeError("'Lock' object has no attribute 'on'")
  def off(self):
      raise AttributeError("'Lock' object has no attribute 'off'")
  def blink(self):
      raise AttributeError("'Lock' object has no attribute 'blink'")
 

Page 62

from gpiozero import LED
led1 = LED(4)
led2 = LED(17)
while True:
    led1.on()
    led2.on()
    led1.off()
    led2.off()
 

Page 65

from gpiozero import Device
from time import sleep
Device()
pin=Device.pin_factory.pin(4)
pin._set_function('output')
while True:
    pin.state=1
    sleep(1)
    pin.state=0
    sleep(1) 

Page 70

from gpiozero import Device
Device()
pin = Device.pin_factory.pin(4)
pin._set_function("output")
while True:
    pin.state=1
    pin.state=0
 

 

Page 98

from gpiozero import Button
from signal import pause
button = Button(4, bounce_time=0.25)

def pressed():
    print(“Button pressed”)

def released():
    print(“Button released”)

button.when_pressed = pressed
button.when_released = released
pause()

Page 102

from gpiozero import Button
from gpiozero import LED
from time import sleep

button = Button(4)
led1 = LED(17, initial_value=True)
led2 = LED(27)
led3 = LED(22)

state = 0
buttonState = button.value
while True:
    buttonNow = button.value
    buttonDelta = buttonNow-buttonState
    buttonState = buttonNow
    if state == 0:
        if buttonDelta == 1:
            state = 1
            led1.off()
            led2.on()
            led3.off()

        continue
    if state == 1:
        if buttonDelta == 1:
            state = 2
            led1.off()
            led2.on()
            led3.off()
        continue
    if state == 2:
        if buttonDelta == 1:
            state = 0
            led1.on()
            led2.off()
            led3.off()
        continue
    sleep(0.1)

 

Page 103 

from gpiozero import Button
from time import perf_counter_ns

button = Button(4)
while True:
    button.wait_for_press()
button.wait_for_release()
t1 = perf_counter_ns()
button.wait_for_press()
t2 = perf_counter_ns()
print((t2-t1)/1000000)
 

Page 104 

from gpiozero import Button
from time import perf_counter_ns
button = Button(4)
while True:
    while not button.value:
        pass
    while button.value:
        pass
    t1 = perf_counter_ns()
    while not button.value:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 104

from gpiozero import DigitalInputDevice
from time import perf_counter_ns
pulse = DigitalInputDevice(4)
while True:
    while not pulse.value:
        pass
    while pulse.value:
        pass
    t1 = perf_counter_ns()
    while not pulse.value:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 105

 
from gpiozero import Device
from time import perf_counter_ns

Device()
pulse = Device.pin_factory.pin(4)
pulse.input_with_pull("up")
while True:
    while not pulse.state:
        pass
    while pulse.state:
        pass
    t1 = perf_counter_ns()
    while not pulse.state:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 105

from gpiozero import DigitalInputDevice


class Door(DigitalInputDevice):
    def __init__(self, pin=None, pull_up=True, active_state=None,
                 bounce_time=None, pin_factory=None):
        super(Door, self).__init__(
            pin, pull_up=pull_up, active_state=active_state,
            bounce_time=bounce_time, pin_factory=pin_factory)

    @property
    def value(self):
        return super(Door, self).value


Door.is_closed = Door.is_active
Door.when_open = Door.when_deactivated
Door.when_closed = Door.when_activated
Door.wait_for_open = Door.wait_for_inactive
Door.wait_for_close = Door.wait_for_active
 
 

 

 

Page 116

 
from gpiozero import LightSensor


class Moisture(LightSensor):
    def __init__(self, pin=None, queue_len=5, charge_time_limit=0.01, threshold=0.1, partial=False, pin_factory=None):
        super(Moisture, self).__init__(pin, threshold=threshold, queue_len=queue_len,
                                       charge_time_limit=charge_time_limit, pin_factory=pin_factory)


Moisture.wait_for_dry = Moisture.wait_for_light
Moisture.wait_for_wet = Moisture.wait_for_dark
Moisture.when_dry = Moisture.when_light
Moisture.when_wet = Moisture.when_dark

Page 119

from gpiozero import PWMLED, DistanceSensor
from signal import pause

led=PWMLED(4)
dist=DistanceSensor(5,6)
led.source=dist
pause

Page 121

from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import inverted
from signal import pause
led = PWMLED(4)
dist = DistanceSensor(5, 6)
led.source = inverted(dist)
pause

Page 121

 
from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import inverted, averaged
from signal import pause

led = PWMLED(4)
front = DistanceSensor(5, 6)
back = DistanceSensor(7, 8)
led.source = averaged(front, back)
pause

Page 122

from signal import pause
from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import _normalize


def mined(*values):
    values = [_normalize(v) for v in values]
    for v in zip(*values):
        yield min(v)


led = PWMLED(4)
front = DistanceSensor(5, 6)
back = DistanceSensor(7, 8)
led.source = mined(front, back)
pause

Page 123

def PWM_values(period=360, duty=0.5):
    _ontime = int(period*duty)
    _offtime = period-_ontime
    data = [1]*_ontime+[0]*_offtime
    while True:
        for i in range(_ontime):
            print(1)
            yield 1
        for i in range(_offtime):
            print(0)
            yield 0

Page 126

from gpiozero import LED, CPUTemperature
from signal import pause
from gpiozero.tools import booleanized

cpu = CPUTemperature(min_temp=58, max_temp=90)
print(cpu.temperature)
led=LED(4)
led.source = booleanized(cpu,0,1)
pause()

Page 126

import io
from gpiozero import InternalDevice

class MemoryFree(InternalDevice):
    def __init__(self, threshold=0.9, memfile='/proc/meminfo',
                                               pin_factory=None):
        super(MemoryFree, self).__init__(pin_factory=pin_factory)
        self._fire_events(self.pin_factory.ticks(), None)
        self.memfile = memfile
        self.threshold = threshold
        self._totalmem = 0
        self._availablemem = 0

   
    @property
    def free_mem(self):
        with io.open(self.memfile, 'r') as f:
            self._totalmem = int(f.readline().strip().split()[1])
            freemem = int(f.readline().strip().split()[1])
            self._availablemem = int(f.readline().strip().split()[1])
            return freemem

    @property
    def value(self):
        return self.free_mem/self._availablemem

    @property
    def is_active(self):
        return self.value < 1-self.threshold

Page 127 You need to add the previous class to make this work

from gpiozero import PWMLED
from signal import pause

led = PWMLED(4)

mem = MemoryFree(0.1)
print(mem.value, flush=True)
print(mem.is_active, flush=True)

led.source=mem
pause()

 

Page 134

from gpiozero import PWMOutputDevice
from signal import pause
pwm = PWMOutputDevice(4)
pwm.frequency = 10000
pwm.value = 0.5
pause()

Page 136

from gpiozero import PWMOutputDevice
pwm = PWMOutputDevice(4)
pwm.frequency = 1000
while True:
    pwm.value = 0.1
    pwm.value = 0.9

Page 132

from gpiozero import PWMOutputDevice
from time import sleep
pwm = PWMOutputDevice(4)
pwm.frequency = 1000
pwm.value = 0.1
while True:
    sleep(0.5)
    pwm.toggle()

Page 139

from gpiozero import PWMOutputDevice
from time import sleep

pwm = PWMOutputDevice(4)
pwm.frequency = 1000
steps = 8
delay = 0.01
while True:
    for d in range(steps):
        pwm.value = d/steps
    sleep(delay)

for d in range(steps):
    pwm.value = (steps-d)/steps
    sleep(delay)

Page 145

from gpiozero import PWMOutputDevice
from time import sleep

pwm = PWMOutputDevice(4)
pwm.frequency = 1000
steps = 8
delay = 0.001
while True:
    for d in range(steps):
        pwm.value = d/steps
        sleep(delay)

    for d in range(steps):
        pwm.value = (steps-d)/steps
        sleep(delay)

Page 153

class uniMotor(PWMOutputDevice):
    def __init__(self, pin=None, active_high=True, initial_value=0, pin_factory=None):
        super(uniMotor, self).__init__(pin, active_high, initial_value, pin_factory=pin_factory)

    def speed(self, value):
        self._write(value)

motor = uniMotor(4)
motor.speed(0.5)
sleep(10)

Page 161

from gpiozero import Servo
from time import sleep

servo = Servo(4)

while True:
    servo.min()
    sleep(0.5)
    servo.mid()
    sleep(0.5)
    servo.max()
    sleep(0.5)

Page 162

from gpiozero import Servo

class ServoInvert(Servo):

@Servo.value.setter
  def value(self, value):
       if value is None:
            self.pwm_device.pin.frequency = None
        elif -1 <= value <= 1:
            self.pwm_device.pin.frequency = int(1 / self.frame_width)
            self.pwm_device.value = (self._min_dc + self._dc_range *((value - self._min_value) / self._value_range))
        else:
            raise OutputDeviceBadValue(
                "Servo value must be between -1 and 1, or None")

Page 171

from gpiozero import DigitalOutputDevice
from time import sleep


class StepperBi4():
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        self.phase = 0
        self.gpios = tuple(DigitalOutputDevice(pin) for pin in (A, B, Am, Bm))
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self.gpios, self.halfstepSeq[phase]):
            gpio.value = state

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)


step = StepperBi4(4, 17, 27, 22)
step.setPhase(0)
while True:
    sleep(0.001)
    step.stepForward()
 

 

Page 176

from gpiozero import LED
from time import sleep


class TriLED():
    def __init__(self, pin1=None, pin2=None, pin3=None):
        self.LEDs = (LED(pin1), LED(pin2), LED(pin3))

    def AllOn(self):
        self.LEDs[0].on()
        self.LEDs[1].on()
        self.LEDs[2].on()

    def AllOff(self):
        self.LEDs[0].off()
        self.LEDs[1].off()
        self.LEDs[2].off()

Page 177

 class TriLED(CompositeDevice):
       def __init__(self, pin1=None, pin2=None, pin3=None):
            super(TriLED, self).__init__(LED(pin1), LED(pin2), LED(pin3))

        def AllOn(self):
            self[0].on()
            self[1].on()
            self[2].on()

        def AllOff(self):
            self[0].off()
            self[1].off()
            self[2].off()

Page 192

from gpiozero import DigitalOutputDevice, CompositeDevice


class StepperBi4(CompositeDevice):
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        if not all(p is not None for p in [A, Am, B, Bm]):
            raise GPIOPinMissing(
                'Four GPIO pins must be provided'
            )
        super(StepperUni4, self).__init__(DigitalOutputDevice(A), DigitalOutputDevice(
            B), DigitalOutputDevice(Am), DigitalOutputDevice(Bm))
        self.phase = 0
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self, self.halfstepSeq[phase]):
            gpio.pin._set_state(state)

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)

Page 194

from gpiozero import DigitalOutputDevice, CompositeDevice
from time import sleep
from gpiozero.threads import GPIOThread


class StepperBi4(CompositeDevice):
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        if not all(p is not None for p in [A, Am, B, Bm]):
            raise GPIOPinMissing('Four GPIO pins must be provided')
        super(StepperUni4, self).__init__(DigitalOutputDevice(A), DigitalOutputDevice(
            B), DigitalOutputDevice(Am), DigitalOutputDevice(Bm))
        self.phase = 0
        self._rotate_thread = None
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self, self.halfstepSeq[phase]):
            gpio.pin._set_state(state)

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)

    def forward(self, speed=0):
        self._stop_rotate()
        if speed == 0:
            return
        self._rotate_thread = GPIOThread(
            target=self._rotate, args=(+1, speed))
        self._rotate_thread.start()

    def reverse(self, speed=1):
        self._stop_rotate()
        if speed == 0:
            return
        self._rotate_thread = GPIOThread(
            target=self._rotate,   args=(-1, speed))
        self._rotate_thread.start()

    def _rotate(self, dir, speed):
        delay = 60/(speed*400)-0.001
        while True:
            if dir == 1:
                self.stepForward()
            if dir == -1:
                self.stepReverse()
            if self._rotate_thread.stopping.wait(delay):
                break

    def _stop_rotate(self):
        if getattr(self, '_rotate_thread', None):
            self._rotate_thread.stop()
        self._rotate_thread = None
 

Page 199
 
import subprocess
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
lastSPI = output.rfind("spi")
if lastSPI != -1:
    lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print("adding", output)
else:
    temp = subprocess.Popen(
        ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print("adding", output)
 
 Page 190
 
import subprocess
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
lastSPI = output.rfind("spi")
if lastSPI != -1:
    lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen( ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print("adding", output)
else:
    temp = subprocess.Popen( ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print("adding", output)

Page 212
 
from gpiozero import SPIDevice

Dev=SPIDevice()

Dev.clock_mode=0
Dev.select_high=False
Dev._spi._interface.max_speed_hz=7000

words=Dev._spi.transfer([0xAA])
if words[0]==0xAA:
    print("data received correctly")
 
Page 215
from gpiozero import SPIDevice
Dev = SPIDevice()
Dev.clock_mode = 0
Dev.select_high = False
Dev._spi._interface.max_speed_hz = 60000

words = Dev._spi.transfer([0x01, 0x80, 0x00])
data = (words[1] & 0x03) << 8 | words[2]
volts = data * 3.3 / 1023.0
print(volts)
Dev.close()
 Page 220
class DS3234rtc(SPIDevice):
    def __init__(self):
        super(DS3234rtc, self).__init__()
        self.clock_mode = 1
        self.select_high = False
        self._spi._interface.max_speed_hz = 5000000

    def setDateTime(self, dateTime):
        datetimetuple = dateTime.timetuple()
        datetimetuple = (datetimetuple[0]-2000,)+datetimetuple[1:3] + \
            (dateTime.isoweekday(),)+datetimetuple[3:6]
        datetimetuple = datetimetuple[::-1]
        for i in range(0, 7):
            data = datetimetuple[i]
            data = (data//10) << 4 | (data % 10)
            words = self._spi.transfer([0x80+i, data])

    def getDateTime(self):
        datetimelist = []
        for i in range(7):
            words = self._spi.transfer([i, 0x00])
            if i == 3:
                continue
            byte = words[1]
            data = (byte & 0x0F)+(byte >> 4)*10
            datetimelist.insert(0, data)
        datetimelist[0] += 2000
        return datetime(*datetimelist)

Page 224
import lgpio

GPIOChip = lgpio.gpiochip_open(0)

lgpio.gpio_claim_output(GPIOChip, 4)

while True:
   lgpio.gpio_write(GPIOChip, 4, 0)
   lgpio.gpio_write(GPIOChip, 4, 1)

lgpio.gpiochip_close(GPIOChip)
 
Page 226
import lgpio

GPIOChip = lgpio.gpiochip_open(4)

lgpio.gpio_claim_output(GPIOChip, 4)
lgpio.gpio_claim_output(GPIOChip, 17)
while True:
   lgpio.gpio_write(GPIOChip, 4, 1)
   lgpio.gpio_write(GPIOChip, 17, 0)
   lgpio.gpio_write(GPIOChip, 4, 0)
   lgpio.gpio_write(GPIOChip, 17, 1)
lgpio.gpiochip_close(GPIOChip)
 
Page 229
import lgpio
import time

GPIOChip = lgpio.gpiochip_open(4)
lgpio.tx_pwm(GPIOChip, 4, 1000, 25)
time.sleep(1)
lgpio.tx_pwm(GPIOChip, 4, 1000, 50)
while True:
    pass
 
Page 231
import lgpio
from collections import namedtuple

Pulse = namedtuple('Pulse', ['group_bits', 'group_mask', 'pulse_delay'])

GPIOChip = lgpio.gpiochip_open(4)
lgpio.group_claim_output(GPIOChip, [4,17])

lgpio.tx_wave(GPIOChip, 4,[Pulse(1,3,1000),Pulse(2,3,1000), Pulse(1,3,1000),Pulse(2,3,1000)] )

while True:
   pass
 
Page 232
import lgpio
from collections import namedtuple

Pulse = namedtuple('Pulse', ['group_bits', 'group_mask','pulse_delay'])

GPIOChip = lgpio.gpiochip_open(4)
lgpio.group_claim_output(GPIOChip, [4,17])
while True:
    if lgpio.tx_room(GPIOChip,4,lgpio.TX_WAVE)>2:
        lgpio.tx_wave(GPIOChip, 4, [Pulse(1,3,1000),Pulse(2,3,1000)])
 
 
 
Page 227 Appendix

settings.json

{
   "sshUser": "pi",
   "sshEndpoint": "192.168.11.170",
   "remoteDirectory": "/home/pi/Documents/",    
}

tasks.json

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "copyToRemote",
            "type": "shell",
            "command": "scp -r ${fileDirname} ${config:sshUser}@${config:sshEndpoint}:${config:remoteDirectory}/",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
                "clear": true
            }
        },
        {
            "label": "makeRemoteWorkSpace",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'mkdir  ${config:remoteDirectory}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunR",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'python3 ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunRemote",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "RunR"
            ],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            },
        },
        
        {
            "label": "StopRemotePython",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'pkill python3'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": true,
            }
        },
        {
            "label": "wait",
            "type": "shell",
            "command": "timeout 10"
        },
        {
            "label": "tunnel",
            "type": "shell",
            "command": "ssh -2 -L 5678:localhost:5678 ${config:sshUser}@${config:sshEndpoint}",
            "problemMatcher": [],
            "presentation": {                
                "showReuseMessage": false,             
            }
        },
        {
            "label": "startDebug",
            "type": "shell",
            "command": "ssh -2 ${config:sshUser}@${config:sshEndpoint} ; 
                   'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client             
                               ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "copyAndDebug",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote", 
                "startDebug",
                "wait"
            ],
            "presentation": {               
                "showReuseMessage": false,                
            },
        },
    ]
}
 

 launch.json

 
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Remote Attach",
            "type": "python",  
            "connect": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}/
                                      ${relativeFileDirname}/",
                    "remoteRoot": "${config:remoteDirectory}/
                                      ${relativeFileDirname}"
                }
            ],
            "request": "attach",
            "preLaunchTask": "copyAndDebug",
            "postDebugTask": "StopREmotePython"
        }
    ]
}
 

Raspberry Pi IoT In Python Using Linux Drivers

Programs

Cdrivers360

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project. 

 

Note: The VS Code task listings are at the very end of this page.

The only downside is that you have to create a project to paste the code into.

To do this follow the instructionin the book-- in particular make sure you have added any libraries to the link step and are running as root if required. 

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 32

import io
from time import sleep

fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("none")
fd.close()

fd = io.open("/sys/class/leds/led0/brightness", "w")
while(True):
    fd.write("0")
    fd.flush()
    sleep(1)
    fd.write("1")
    fd.flush()
    sleep(1)

 

Page 33

import io
from time import sleep

fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("timer")
fd.close()

fd = io.open("/sys/class/leds/led0/delay_on", "w")
fd.write("2000")
fd.close()

fd = io.open("/sys/class/leds/led0/delay_off", "w")
fd.write("3000")
fd.close()

Page 42

import gpiod
chip = gpiod.Chip("0")
print(chip.label())
print(chip.name())
print(chip.num_lines())
chip.close()

Page 45

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
while(True):
    line.set_value(1)
    line.set_value(0)

Page 46

import gpiod

chip = gpiod.Chip("0")
lines = chip.get_lines([4, 17])

lines.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT,
              default_vals=[0, 1])
while(True):
    lines.set_values([1, 0])
    lines.set_values([0, 1])

Page 48

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_OUT,
             default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
t = time_ns()
while(line.get_value() == 0):
    pass
print((time_ns()-t)/1000)

 

Page 55

import fcntl
import struct
import io
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401

f = io.open("/dev/gpiochip0", "rb", buffering=0)

gpiochip_info = struct.Struct("32s 32s L")
buffer = gpiochip_info.pack(b' ', b' ', 0)

result = fcntl.ioctl(f, GPIO_GET_CHIPINFO_IOCTL, buffer)

name, label, lines = gpiochip_info.unpack(result)

print(name.rstrip(b'\0').decode("utf-8"))
print(label.rstrip(b'\0').decode("utf-8"))
print(lines)

Page 59

import fcntl
import struct
import io

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
                                 *values, b"Output test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", buffer, 360)[0]

values[0] = 1
values[1] = 0
state1 = gpiohandle_data.pack(*values)

values[0] = 0
values[1] = 1
state2 = gpiohandle_data.pack(*values)

while(True):
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state1)
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state2)

Page 60

import fcntl
import struct
import io
import os

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Input test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 360)[0]
inputBuffer = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL,
                     inputBuffer)
os.close(fL)
print(result[0], result[1])

Page 61

import fcntl
import struct
import io
import os
from time import sleep, time_ns
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
values = [0]*64

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Resistance test", 1, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
state = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
                                 *values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
                                 *values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
t = time_ns()
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
while(result[0] == 0):
    result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
print((time_ns()-t)/1000)
os.close(fL)
f.close()

Page 68

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

event = line.event_wait(sec=1)
if event:
    print("Event on line 4")
else:
    print("Time out")

Page 69

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py",
             type=gpiod.LINE_REQ_EV_RISING_EDGE)
t = time_ns()
event = line.event_read()
print((event.sec*1000000000+event.nsec-t)/1000)

Page 70

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

while(True):
    while True:
        event1 = line.event_read()
        if event1.type == event1.RISING_EDGE:
            break
    while True:
        event2 = line.event_read()
        if event2.type == gpiod.LineEvent.FALLING_EDGE:
            break
    print((event2.sec-event2.sec)*1000000000 +
          (event2.nsec-event1.nsec))

Page 71a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_IN)
print("Press the button", flush=True)
sleep(20)
if line.get_value() == 1:
    print("button pressed")
else:
    print("button not pressed")

Page 71b

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
print("Press the button", flush=True)
sleep(20)
event = line.event_wait(sec=0)
if event:
    print("button pressed")
else:
    print("button not pressed")

Page 72

import gpiod


def event_poll(line):
    event = line.event_wait(sec=0)
    if event:
        event = line.event_read()
    return event


chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
while True:
    event = event_poll(line)
    if event:
        print(event.sec, flush=True)

Page 75

import fcntl
import struct
import io
import os
from time import sleep, time_ns
import select

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIO_GET_LINEEVENT_IOCTL = 0xC030B404

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

GPIOEVENT_REQUEST_RISING_EDGE = 0x01
GPIOEVENT_REQUEST_FALLING_EDGE = 0x02
GPIOEVENT_REQUEST_BOTH_EDGES = 0x03

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
gpioevent_request = struct.Struct("L L L 32s L")
gpioevent_data = struct.Struct("Q L")

buffer = gpioevent_request.pack(4, GPIOHANDLE_REQUEST_INPUT,
                                GPIOEVENT_REQUEST_BOTH_EDGES, b"Event test", 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEEVENT_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 44)[0]

e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)

Page 76

import gpiod
import struct
import io
import os
import select

gpioevent_data = struct.Struct("Q L")

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

fL = line.event_get_fd()
e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)

Page 77

import gpiod
from time import sleep
import threading


def waitForInterrupt(line):
    while(True):
        event = line.event_read()
        print(event.sec*1000000000+event.nsec, flush=True)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

IntThread = threading.Thread(target=waitForInterrupt, args=(line,))
IntThread.start()


while True:
    sleep(2)
    print("running", flush=True)

Page 87

import io

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
              "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open(
    "/ sys/bus/iio/devices/iio: device0 /in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()

Page 89

import subprocess
import io
import fcntl


def checkDht11():
    indicator = "dht11  gpiopin=4"
    command = ["sudo", "dtoverlay", "dht11", "gpiopin=4"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkDht11()

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
              "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open(
    "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()

Page 111a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 25

ontime = period/1000 * duty / 100
offtime = period/1000 - ontime

while(True):
    line.set_value(1)
    sleep(ontime)
    line.set_value(0)
    sleep(offtime)

Page 111b

import gpiod
from time import sleep
import threading


def softPWM(line, period, duty):
    ontime = period/1000 * duty / 100
    offtime = period/1000 - ontime
    while(True):
        line.set_value(1)
        sleep(ontime)
        line.set_value(0)
        sleep(offtime)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py",
             type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 75

IntThread = threading.Thread(target=softPWM, args=(line, period, duty))
IntThread.start()

while(True):
    print("working", flush=True)
    sleep(2)

Page 115

import subprocess
import io


def checkPWM():
    indicator = "pwm-2chan"
    command = ["sudo", "dtoverlay", "pwm-2chan"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkPWM()

fdw = io.open("/sys/class/pwm/pwmchip0/export", "wb", buffering=0)
fdw.write(b"0")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/period", "wb", buffering=0)
fdw.write(b"10000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/enable", "wb", buffering=0)
fdw.write(b"1")
fdw.close()

Page 116

import subprocess
import io
from time import sleep
import os


class Pwm:
    def __init__(self, channel, f, duty):
        if not(channel == 0 or channel == 1):
            return
        self.chan = str(channel)
        indicator = "pwm-2chan"
        command = ["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(
            ["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
        if output.find(indicator) == -1:
            temp = subprocess.Popen(command, stdout=subprocess.PIPE)
            output = str(temp.communicate())
            print(output, flush=True)

        if not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan)):
            fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
            fdw.write(self.chan)
            fdw.close()

        while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable")):
            pass

        self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/period", "w")
        self.setFreq(f)
        self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self, f):
        self.f = int(1000000000/f)
        self.fdwp.write(str(self.f))
        self.fdwp.flush()

    def setDuty(self, duty):
        self.duty = int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()

    def disableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()

    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
        fdw.write(self.chan)
        fdw.close()

Page 127

import subprocess
import io
from time import sleep
import os


class Pwm:
    def __init__(self, channel, f, duty):
        if not(channel == 0 or channel == 1):
            return
        self.chan = str(channel)
        indicator = "pwm-2chan"
        command = ["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(
            ["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
        if output.find(indicator) != -1:
            temp = subprocess.Popen(
                ["sudo", "dtoverlay", "-r", "0"], stdout=subprocess.PIPE)
            output = str(temp.communicate())
            print(output, flush=True)

        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)

        if os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan):
            fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
            fdw.write(self.chan)
            fdw.close()

        while os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable"):
            pass
        fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
        fdw.write(self.chan)
        fdw.close()

        while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/period")):
            pass

        self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/period", "w")
        self.setFreq(f)
        self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
                            self.chan+"/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self, f):
        self.fdwp.write("200")
        self.fdwp.flush()
        self.f = int(1000000000/f)
        self.fdwp.write(str(self.f))
        self.fdwp.flush()

    def setDuty(self, duty):
        self.duty = int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()

    def disableChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()

    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
        fdw.write(self.chan)
        fdw.close()

    def inverChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
        fdw.write("inversed")
        fdw.close()

    def normalChan(self):
        fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
        fdw.write("normal")
        fdw.close()

Page 137

import subprocess
import spidev
from time import sleep


def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 5000
Data = [0xAA]
print(hex(Data[0]))
data = spi.xfer(Data)
print(hex(Data[0]))
if Data == 0xAA:
    print("match")
spi.close()

Page 146

import subprocess
import spidev


def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 100000
spi.mode = 0
spi.bits_per_word = 8

Data = [0x01, 0x80, 0x00]
spi.xfer(Data)
raw = (Data[1] & 0x03) << 8 | Data[2]
print(raw)
volts = raw * 3.3 / 1023.0
print(volts, "V")
spi.close()

Page 165

import subprocess
import io
import fcntl


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()

I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xE7]))
data = fdr.read(1)
print(data)
fdr.close()
fdw.close()

Page 167

import subprocess
import io
import fcntl
from time import sleep

checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
print("msb=", msb, " lsb=", lsb, " crc=", crc)

fdw.close()
fdr.close()

Page 171

import subprocess
import io
import fcntl
from time import sleep


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")

fdw.write(bytearray([0xF5]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)

msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536

print("humidity=", hum, "%")

print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()

Page 183

import subprocess
import io
import struct
import fcntl
from time import sleep
import mmap
import os


def setTimeout(timeout):
    # find base address
    peripherals_base = 0x20000000
    try:
        with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
            buf = f.read(16)
            peripherals_base = buf[4] << 24 | buf[5] << 16 | buf[6] << 8 | buf[7] << 0
            if peripherals_base == 0:
                peripherals_base = buf[8] << 24 | buf[9] << 16 | buf[10] << 8 | buf[11] << 0
    except:
        pass
#open /dev/mem
    try:
        memfd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
    except OSError:
        print('unable to open /dev/mem upgrade your kernel or run as root')
# map file from start of BCS1 into memory
    mem = mmap.mmap(memfd, 4096, offset=peripherals_base + 0x804000)
    os.close(memfd)

# Write a value to the clock stretch timeout register
    struct.pack_into('<L', mem, 0x1C, timeout)

    mem.flush
    mem.close()


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)

setTimeout(10000)

fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)


fdw.write(bytearray([0xE3]))
data = fdr.read(3)

msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(crcCheck(msb, lsb, crc))

fdw.write(bytearray([0xE5]))
data = fdr.read(3)

msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536

print("humidity=", hum, "%")
print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()

Page 186

import subprocess
from ctypes import *
import os
import fcntl
import io


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],
                            stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
                                stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703
I2C_FUNCS = 0x0705

I2C_FUNC_I2C = 0x00000001
I2C_FUNC_10BIT_ADDR = 0x00000002
I2C_FUNC_PROTOCOL_MANGLING = 0x00000004
I2C_FUNC_NOSTART = 0x00000010
I2C_FUNC_SLAVE = 0x00000020

i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
support = c_uint32(0)
fcntl.ioctl(i2cfd, I2C_FUNCS, support)
support = support.value
if support & I2C_FUNC_I2C:
    print("I2C Support")
if support & I2C_FUNC_10BIT_ADDR:
    print("10 bit address Support")
if support & I2C_FUNC_PROTOCOL_MANGLING:
    print("I2C Mangling Support")
if support & I2C_FUNC_NOSTART:
    print("I2C Nostart Support")
if support & I2C_FUNC_SLAVE:
    print("I2C Slave Support")

Page 191

import fcntl
import os
from ctypes import *


class Msgs(Structure):
    _fields_ = [("addr", c_uint16),
                ("flags", c_uint16),
                ("len", c_uint16),
                ("buf", POINTER(c_uint8))
                ]


class MsgSet(Structure):
    _fields_ = [("msgs", POINTER(Msgs)),
                ("nmsgs", c_uint32)
                ]


def i2cReadRegister(i2cfd, slaveaddr, reg, n):
    I2C_RDWR = 0x0707
    I2C_M_RD = 0x0001

    msgs = (Msgs*2)()

    msgs[0].addr = c_uint16(slaveaddr)
    msgs[0].flags = c_uint16(0)
    msgs[0].len = c_uint16(1)
    msgs[0].buf = POINTER(c_uint8)(c_uint8(reg))

    msgs[1].addr = c_uint16(slaveaddr)
    msgs[1].flags = c_uint16(I2C_M_RD)
    msgs[1].len = c_uint16(n)
    buf = (c_uint8*n)(0)
    msgs[1].buf = POINTER(c_uint8)(buf)

    msgset = MsgSet()
    msgset.msgs = POINTER(Msgs)(msgs)
    msgset.nmsgs = c_uint32(2)

    libc = CDLL("libc.so.6")
    libc.ioctl(i2cfd, I2C_RDWR, byref(msgset))

    return msgs[1].buf


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
data = i2cReadRegister(i2cfd, 0x40, 0xE3, 3)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(hex(msb), hex(lsb), hex(crc))
print(crcCheck(msb, lsb, crc))

Page 200

import subprocess
import io


def checkLM75():
    indicator1 = "i2c_arm=on"
    command1 = ["sudo", "dtparam", "i2c_arm=on"]
    indicator2 = "lm75"
    command2 = ["sudo", "dtoverlay", "i2c-sensor", "lm75", "addr=0x48"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator1) == -1:
        temp = subprocess.Popen(command1, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
    if output.find(indicator2) == -1:
        temp = subprocess.Popen(command2, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


checkLM75()

fdr = io.open("/sys/class/hwmon/hwmon2/temp1_input", "r")
buf = fdr.read()
print(buf)
temp = int(buf) / 1000
print(temp)
fdr.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max", "w")
fdw.write("19000")
fdw.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max_hyst", "w")
fdw.write("18000")
fdw.close()

Page 204

import subprocess
import io


def loadHtu21():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find("htu21") == -1:
        temp = subprocess.Popen(
            ["sudo", "dtoverlay", "i2c-sensor", "htu21"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


loadHtu21()

fdrt = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input", "r")
print(int(fdrt.read())/1000)
fdrh = io.open(
    "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "r")
print(int(fdrh.read())/1000)

Page 217

import subprocess
import io


def load1w(pin):
    indicator = "w1-gpio"
    command = ["sudo", "dtoverlay", "w1-gpio", "gpiopin="+str(pin)]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


def getDevices():
    fdr = io.open(
        "/sys/bus/w1/drivers/w1_master_driver/w1_bus_master1/w1_master_slaves", "r")
    buffer = fdr.read()
    fdr.close()
    return buffer.split()


def getData(dev, name):
    fdr = io.open("/sys/bus/w1/devices/"+dev+"/"+name, "r")
    data = fdr.read()
    fdr.close()
    return data


load1w(4)
devs = getDevices()
name = getData(devs[0], "name")
print("Name ", name)
resolution = getData(devs[0], "resolution")
print("Resolution ", resolution)
w1_slave = getData(devs[0], "w1_slave")
print("w1_slave ", w1_slave)
temperature = getData(devs[0], "temperature")
print("temperature ", temperature)
temperature = int(temperature) / 1000
print("temperature ", temperature, "C")
alarms = getData(devs[0], "alarms")
print("Alarms ", alarms)

Page 228

from ctypes import *
from socket import *
from struct import *
import os
import subprocess
import io
from time import sleep


NLMSG_DONE = 0x3

CN_W1_IDX = 0x3
CN_W1_VAL = 0x1

W1_SLAVE_ADD = 0
W1_SLAVE_REMOVE = 1
W1_MASTER_ADD = 2
W1_MASTER_REMOVE = 3
W1_MASTER_CMD = 4
W1_SLAVE_CMD = 5
W1_LIST_MASTERS = 6

W1_CMD_READ = 0
W1_CMD_WRITE = 1
W1_CMD_SEARCH = 2
W1_CMD_ALARM_SEARCH = 3
W1_CMD_TOUCH = 4
W1_CMD_RESET = 5
W1_CMD_SLAVE_ADD = 6
W1_CMD_SLAVE_REMOVE = 7
W1_CMD_LIST_SLAVES = 8
W1_CMD_MAX = 9

NETLINK_CONNECTOR = 11

nl_seq = 0


class nlmsghdr(Structure):
    _fields_ = [("nlmsg_len", c_uint32),
                ("nlmsg_type", c_uint16),
                ("nlmsg_flags", c_uint16),
                ("nlmsg_seq", c_uint32),
                ("nlmsg_pid", c_uint32)
                ]


cnh = nlmsghdr()
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cnh.nlmsg_pid = c_uint32(os.getpid())
cnh.nlmsg_type = c_uint16(NLMSG_DONE)
cnh.nlmsg_flags = c_uint16(0)


class cn_msg(Structure):
    _fields_ = [("idx", c_uint32),
                ("val", c_uint32),
                ("seq", c_uint32),
                ("ack", c_uint32),
                ("len", c_uint16),
                ("flags", c_uint16),
                ]


cmsg = cn_msg()
cmsg.idx = c_uint32(CN_W1_IDX)
cmsg.val = c_uint32(CN_W1_VAL)
cmsg.seq = c_uint32(cnh.nlmsg_seq)
cmsg.ack = c_uint32(0)


class w1_netlink_msg(Structure):
    _fields_ = [("type", c_uint8),
                ("status", c_uint8),
                ("len", c_uint16),
                ("id", c_uint8*8),
                ]


msg = w1_netlink_msg()
msg.type = c_uint8(W1_MASTER_CMD)
msg.id = (c_uint8*8).from_buffer_copy(c_uint64(1))


class w1_netlink_cmd(Structure):
    _fields_ = [("cmd", c_uint8),
                ("res", c_uint8),
                ("len", c_uint16),
                ]


w1_cmd = w1_netlink_cmd()
w1_cmd.cmd = c_uint8(W1_CMD_SEARCH)
w1_cmd.len = c_uint16(0)


msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)


s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR)
s.connect((0, AF_NETLINK))

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("Bytes received", len(buffer2), flush=True)

p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)

num = int(w1_cmdr.len/8)
print("number of slaves", num)

p = p+len(bytes(w1_cmdr))
slaves = []
for i in range(num):
    slaves.append(c_uint64.from_buffer_copy(buffer2, p+i*8).value)
print([hex(slave) for slave in slaves])

# GET ACK
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

# READ TEMPERATURE
# SET UP SLAVE COMMAND
print()
print("Read Temperature")

msg.type = W1_SLAVE_CMD

id = slaves[0].to_bytes(8, "little")
msg.id = (c_uint8*8).from_buffer_copy(id)


# SETUP AND SEND WRITE 0x44 = CONVERT
w1_cmd.cmd = c_uint8(W1_CMD_WRITE)
w1_cmd_data = c_uint8(0x44)


cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cmsg.seq = c_uint32(cnh.nlmsg_seq)

w1_cmd.len = c_uint16(1)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg) + \
    bytearray(w1_cmd)+bytearray(w1_cmd_data)

n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
sleep(1)

# READ SCRATCH PAD
cnh.nlmsg_seq = nl_seq
nl_seq += 1
cmsg.seq = cnh.nlmsg_seq

w1_cmd.cmd = W1_CMD_WRITE
w1_cmd_data = c_uint8(0xBE)

w1_cmd2 = w1_netlink_cmd()
w1_cmd2.cmd = c_uint8(W1_CMD_READ)


w1_cmd.len = c_uint16(1)
w1_cmd2.len = c_uint16(9)
msg.len = c_uint16(len(bytearray(w1_cmd)) +
                   len(bytearray(w1_cmd2)) + w1_cmd.len+w1_cmd2.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

w1_cmd2_data = (c_uint8*9)(0)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd) + \
    bytearray(w1_cmd_data)+bytearray(w1_cmd2)+bytearray(w1_cmd2_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)

buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

buffer2 = s.recv(65535)
print("length of data received ", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
# print(cn_cnhr.nlmsg_len)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
p = p+len(bytes(w1_cmdr))

temp = c_uint16.from_buffer_copy(buffer2, p)

print("number of bytes found ", w1_cmdr.len, flush=True)
print(temp.value/16)

VSCODE TASKS

 

settings.json

which has to be customized to your IP, user name and folder:

{
    "sshUser": "mike",
    "sshEndpoint": "192.168.253.20",
    "remoteDirectory": "/home/mike/Documents/${workspaceFolderBasename}",  
}

tasks.json

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "copyToRemote",
            "type": "shell",
            "command": "scp -r ${fileDirname} ${config:sshUser}@${config:sshEndpoint}:${config:remoteDirectory}/",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
                "clear": true
            }
        },
        {
            "label": "makeRemoteWorkSpace",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'mkdir  ${config:remoteDirectory}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunR",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'python3 ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunRemote",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "RunR"
            ],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            },
        },
        {
            "label": "StopREmotePython",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'pkill python3'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": true,
            }
        },
        {
            "label": "wait",
            "type": "shell",
            "command": "timeout 10"
        },
        {
            "label": "tunnel",
            "type": "shell",
            "command": "ssh -2 -L 5678:localhost:5678  ${config:sshUser}@${config:sshEndpoint}",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "startDebug",
            "type": "shell",
            "command": "ssh -2 ${config:sshUser}@${config:sshEndpoint} 'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "copyAndDebug",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "startDebug",
                "wait"
            ],
            "presentation": {
                "showReuseMessage": false,
            },
        },
    ]
}
launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Remote Attach",
            "type": "python",
            "request": "attach",
            "connect": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}/${relativeFileDirname}/",
                    "remoteRoot": "${config:remoteDirectory}/${relativeFileDirname}"
                }
            ],
            "preLaunchTask": "copyAndDebug",
            "postDebugTask": "StopREmotePython"
        }
    ]
}

Raspberry Pi IoT In Python Using Linux Drivers 
Second Edition

Programs

DriverPython2e360

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project. 

 

Note: The VS Code task listings are at the very end of this page.

The only downside is that you have to create a project to paste the code into.

To do this follow the instructionin the book-- in particular make sure you have added any libraries to the link step and are running as root if required. 

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 


Page 41

import io
from time import sleep

fd = io.open("/sys/class/leds/ACT/trigger","w")
fd.write("none")
fd.close()

fd = io.open("/sys/class/leds/ACT/brightness", "w")
while(True):
    fd.write("0")
    fd.flush()
    sleep(1)
    fd.write("1")
    fd.flush()
    sleep(1)

 

Page 43

import io
from time import sleep

fd = io.open("/sys/class/leds/ACT/trigger","w")
fd.write("timer")
fd.close()
fd = io.open("/sys/class/leds/ACT/delay_on", "w")
fd.write("2000")
fd.close()
fd = io.open("/sys/class/leds/ACT/delay_off", "w")
fd.write("3000")
fd.close()
 

 

Page 53

import gpiod
chip = gpiod.Chip("0")
print(chip.label())
print(chip.name())
print(chip.num_lines())
chip.close()
 
change 0 to 4 for a Pi 5

Page 57

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
while(True):
    line.set_value(1)
    line.set_value(0)
 
change 0 to 4 for a Pi 5
 

Page 57

import gpiod

chip = gpiod.Chip("0")
lines = chip.get_lines([4, 17])

lines.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0, 1])
while(True):
    lines.set_values([1, 0])
    lines.set_values([0, 1])
 

Remember the need to change the chip number to "4" for a Pi 5.

Page 59

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
t = time_ns()
while(line.get_value() == 0):
    pass
print((time_ns()-t)/1000)

 

Remember the need to change the chip number to "4" for a Pi 5. 

 


Page 67

import fcntl
import struct
import io
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401

f = io.open("/dev/gpiochip0", "rb", buffering=0)

gpiochip_info = struct.Struct("32s 32s L")
buffer = gpiochip_info.pack(b' ', b' ', 0)

result = fcntl.ioctl(f, GPIO_GET_CHIPINFO_IOCTL, buffer)

name, label, lines = gpiochip_info.unpack(result)

print(name.rstrip(b'\0').decode("utf-8"))
print(label.rstrip(b'\0').decode("utf-8"))
print(lines)
 
To run it on a Pi 5 change gpiochip0 to gpiochip4

Page 70

import fcntl
import struct
import io

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT, *values, b"Output test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", buffer, 360)[0]

values[0] = 1
values[1] = 0
state1 = gpiohandle_data.pack(*values)

values[0] = 0
values[1] = 1
state2 = gpiohandle_data.pack(*values)

while(True):
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state1)
    result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state2)
 

Remember to change gpiochip0 to gpiochip4 if you are running on a Pi 5.

Page 72

import fcntl
import struct
import io
import os

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT, *values, b"Input test", 2, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 360)[0]
inputBuffer = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, inputBuffer)
os.close(fL)
print(result[0], result[1])
 

Remember to change gpiochip0 to gpiochip4 if you are running on a Pi 5.

Page 73

import fcntl
import struct
import io
import os
from time import sleep, time_ns
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")

lines = [0]*64
lines[0] = 4
values = [0]*64

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,*values, b"Resistance test", 1, 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
state = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT, *values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)

buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,*values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
t = time_ns()
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
while(result[0] == 0):
    result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
print((time_ns()-t)/1000)
os.close(fL)
f.close()
 

Remember to change gpiochip0 to gpiochip4 if you are running on a Pi 5.


 

Page 80

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

event = line.event_wait(sec=1)
if event:
    print("Event on line 4")
else:
    print("Time out")
 

Remember to change the chip number from 0” to “4” if running on a Pi 5.

Page 81

import gpiod
from time import sleep, time_ns

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
sleep(0.010)
line.release()

line.request(consumer="resistor.py", type=gpiod.LINE_REQ_EV_RISING_EDGE)
t = time_ns()
event = line.event_read()
print((event.sec*1000000000+event.nsec-t)/1000)
 

Remember to change the chip number from 0” to “4” if running on a Pi 5.

 

Page 82

import gpiod

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

while(True):
    while True:
        event1 = line.event_read()
        if event1.type == event1.RISING_EDGE:
            break
    while True:
        event2 = line.event_read()
        if event2.type == gpiod.LineEvent.FALLING_EDGE:
            break
    print((event2.sec-event2.sec)*1000000000 +
          (event2.nsec-event1.nsec))
 

Remember to change the chip number from 0” to “4” if running on a Pi 5.

 

Page 783a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_IN)
print("Press the button", flush=True)
sleep(20)
if line.get_value() == 1:
    print("button pressed")
else:
    print("button not pressed")
 

Don’t forget to change "0" to "4" if running on a Pi 5.

Page 83b

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
print("Press the button", flush=True)
sleep(20)
event = line.event_wait(sec=0)
if event:
    print("button pressed")
else:
    print("button not pressed")
 

Don’t forget to change "0" to "4" if running on a Pi 5.

Page 84

import gpiod

def event_poll(line):
    event = line.event_wait(sec=0)
    if event:
        event = line.event_read()
    return event


chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
while True:
    event = event_poll(line)
    if event:
        print(event.sec, flush=True)
 

Don’t forget to change "0" to "4" if running on a Pi 5.

Page 87

import fcntl
import struct
import io
import os
from time import sleep, time_ns
import select

GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIO_GET_LINEEVENT_IOCTL = 0xC030B404

GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10

GPIOEVENT_REQUEST_RISING_EDGE = 0x01
GPIOEVENT_REQUEST_FALLING_EDGE = 0x02
GPIOEVENT_REQUEST_BOTH_EDGES = 0x03

gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
gpioevent_request = struct.Struct("L L L 32s L")
gpioevent_data = struct.Struct("Q L")

buffer = gpioevent_request.pack(4, GPIOHANDLE_REQUEST_INPUT, GPIOEVENT_REQUEST_BOTH_EDGES, b"Event test", 0)

f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEEVENT_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 44)[0]

e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)
 

Don’t forget to change "0" to "4" if running on a Pi 5.

Page 88

import gpiod
import struct
import io
import os
import select

gpioevent_data = struct.Struct("Q L")

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

fL = line.event_get_fd()
e = select.epoll()
e.register(fL)
while True:
    events = e.poll(1)
    if len(events) > 0:
        buffer = os.read(fL, 100)
        timestamp, id = gpioevent_data.unpack(buffer[0:12])
        print(timestamp, id, flush=True)
 

Don’t forget to change "0" to "4" if running on a Pi 5.

Page 90

import gpiod
from time import sleep
import threading


def waitForInterrupt(line):
    while(True):
        event = line.event_read()
        print(event.sec*1000000000+event.nsec, flush=True)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)

IntThread = threading.Thread(target=waitForInterrupt, args=(line,))
IntThread.start()


while True:
    sleep(2)
    print("running", flush=True)
 
Don’t forget to change "0" to "4" if running on a Pi 5.

 

Page 100

import io

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",  "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open("/ sys/bus/iio/devices/iio: device0 /in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()

Page 102

import subprocess
import io
import fcntl


def checkDht11():
    indicator = "dht11  gpiopin=4"
    command = ["sudo", "dtoverlay", "dht11", "gpiopin=4"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkDht11()

fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()

fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",  "rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()

fdr = io.open( "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()
 

 

Page 123a

import gpiod
from time import sleep

chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 25

ontime = period/1000 * duty / 100
offtime = period/1000 - ontime

while(True):
    line.set_value(1)
    sleep(ontime)
    line.set_value(0)
    sleep(offtime)
 

Remember to change "0" to "4" if you are running on a Pi 5.

Page 123b

import gpiod
from time import sleep
import threading


def softPWM(line, period, duty):
    ontime = period/1000 * duty / 100
    offtime = period/1000 - ontime
    while(True):
        line.set_value(1)
        sleep(ontime)
        line.set_value(0)
        sleep(offtime)


chip = gpiod.Chip("0")
line = chip.get_line(4)

line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])

period = 20
duty = 75

IntThread = threading.Thread(target=softPWM, args=(line, period, duty))
IntThread.start()

while(True):
    print("working", flush=True)
    sleep(2)
 

Remember to change "0" to "4" if you are running on a Pi 5.

 

Page 127

import subprocess
import io

def checkPWM():
    indicator = "pwm-2chan"
    command = ["sudo", "dtoverlay", "pwm-2chan"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return


checkPWM()

fdw = io.open("/sys/class/pwm/pwmchip0/export", "wb", buffering=0)
fdw.write(b"0")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/period", "wb", buffering=0)
fdw.write(b"10000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/enable", "wb", buffering=0)
fdw.write(b"1")
fdw.close()

Page 128

import subprocess
import io

def checkPWM():
    indicator = "pwm-2chan"
    command =["sudo", "dtoverlay", "pwm-2chan","pin=18", "func=2", "pin2=12","func2=4"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout = subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find(indicator)!=-1:
        temp = subprocess.Popen(["sudo","dtoverlay", "-R", "pwm-2chan"], stdout = subprocess.PIPE)
    temp = subprocess.Popen(command, stdout = subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    return

checkPWM()

fdw = io.open("/sys/class/pwm/pwmchip2/export", "wb", buffering=0)
fdw.write(b"0")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm0/period","wb", buffering=0)
fdw.write(b"10000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm0/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm0/enable", "wb", buffering=0)
fdw.write(b"1")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/export", "wb", buffering=0)
fdw.write(b"2")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm2/period",  "wb", buffering=0)
fdw.write(b"10000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm2/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()

fdw = io.open("/sys/class/pwm/pwmchip2/pwm2/enable","wb", buffering=0)
fdw.write(b"1")
fdw.close()
 
 

Page 130

import subprocess
import io
from time import sleep
import os
 
Pi5=True
 
class Pwm:
    def __init__(self, channel,f,duty,Pi5=False):
        if not(channel==0 or channel==1):
            return
        if Pi5:
            self.path="/sys/class/pwm/pwmchip2/"
        else:
            self.path="/sys/class/pwm/pwmchip0/"

        self.chan = str(channel)
   
        indicator = "pwm-2chan  pin=12 func=4 pin2=13 func2=4"
        command =["sudo", "dtoverlay", "pwm-2chan","pin=12", "func=4", "pin2=13","func2=4"]
        temp = subprocess.Popen(["sudo", "dtparam", "-l"],  stdout = subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
        if output.find(indicator)==-1:
            temp = subprocess.Popen(command, stdout = subprocess.PIPE)
            output = str(temp.communicate())
            print(output)    

        if not(os.path.exists(self.path+"pwm"+self.chan)):
            fdw = io.open(self.path+"export", "w")
            fdw.write(self.chan)
            fdw.close()

        while not(os.path.exists(self.path+"pwm"+self.chan+  "/enable")):
            pass
             
        self.fdwp = io.open(self.path+"pwm"+self.chan+ "/period", "w")
        self.setFreq(f)
        self.fdwd = io.open(self.path+"pwm"+self.chan+ "/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self,f):
        self.f=int(1000000000/f)  
        self.fdwp.write(str(self.f))
        self.fdwp.flush()
   
    def setDuty(self,duty):
        self.duty=int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open(self.path+"pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()
   
    def disableChan(self):
        fdw = io.open(self.path+"pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()
   
    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open(self.path+"unexport", "w")
        fdw.write(self.chan)        
        fdw.close()
 

Page 141

import subprocess
import io
from time import sleep
import os
 
Pi5=True
 
class Pwm:
    def __init__(self, channel,f,duty,Pi5=False):
        if not(channel==0 or channel==1):
            return
        if Pi5:
            self.path="/sys/class/pwm/pwmchip2/"
        else:
            self.path="/sys/class/pwm/pwmchip0/"

        self.chan = str(channel)
        indicator = "pwm-2chan"
        command =["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(["sudo", "dtparam", "-l"],  stdout = subprocess.PIPE)
        output = str(temp.communicate())
        print(output,flush=True)

        indicator = "pwm-2chan  pin=12 func=4 pin2=13 func2=4"
        command =["sudo", "dtoverlay", "pwm-2chan"]
        temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout = subprocess.PIPE)
        output = str(temp.communicate())
        print(output,flush=True)
        command =["sudo", "dtoverlay", "pwm-2chan","pin=12", "func=4", "pin2=13","func2=4"]
        temp = subprocess.Popen(["sudo", "dtparam", "-l"],  stdout = subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
        if output.find(indicator)==-1:
            temp = subprocess.Popen(command,
                                         stdout = subprocess.PIPE)
            output = str(temp.communicate())
            print(output)
   
        if not(os.path.exists(self.path+"pwm"+self.chan)):
            fdw = io.open(self.path+"export", "w")
            fdw.write(self.chan)
            fdw.close()
        while not(os.path.exists(self.path+"pwm"+self.chan+"/enable")):
            pass
             
        self.fdwp = io.open(self.path+"pwm"+self.chan+"/period", "w")
        self.setFreq(f)
        self.fdwd = io.open(self.path+"pwm"+self.chan+"/duty_cycle", "w")
        self.setDuty(duty)

    def setFreq(self,f):
        self.f=int(1000000000/f)  
        t=str(self.f)
        self.fdwp.write(str(self.f))
        self.fdwp.flush()
   
    def setDuty(self,duty):
        self.duty=int(self.f*duty/100)
        self.fdwd.write(str(self.duty))
        self.fdwd.flush()

    def enableChan(self):
        fdw = io.open(self.path+"pwm"+self.chan+"/enable", "w")
        fdw.write("1")
        fdw.close()
   
    def disableChan(self):
        fdw = io.open(self.path+"pwm"+self.chan+"/enable", "w")
        fdw.write("0")
        fdw.close()
   
    def closeChan(self):
        self.fdwd.close()
        self.fdwp.close()
        fdw = io.open(self.path+"unexport", "w")
        fdw.write(self.chan)        
        fdw.close()
   
    def inverChan(self):
        fdw = io.open(self.path+ "pwm"+self.chan+"/polarity", "w")
        fdw.write("inversed")
        fdw.close()
   
    def normalChan(self):
        fdw = io.open(self.path+ "pwm"+self.chan+"/polarity", "w")
        fdw.write("normal")
        fdw.close(

 


Page 151

import subprocess
import spidev
from time import sleep


def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 5000
Data = [0xAA]
print(hex(Data[0]))
data = spi.xfer(Data)
print(hex(Data[0]))
if Data == 0xAA:
    print("match")
spi.close()

Page 161

import subprocess
import spidev

def checkSPI():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lastSPI = output.rfind("spi")
    if lastSPI != -1:
        lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())


checkSPI()

spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 100000
spi.mode = 0
spi.bits_per_word = 8

Data = [0x01, 0x80, 0x00]
spi.xfer(Data)
raw = (Data[1] & 0x03) << 8 | Data[2]
print(raw)
volts = raw * 3.3 / 1023.0
print(volts, "V")
spi.close()
 

 

Page 181

import subprocess
import io
import fcntl


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],  stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()

I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xE7]))
data = fdr.read(1)
print(data)
fdr.close()
fdw.close()

Page 183

import subprocess
import io
import fcntl
from time import sleep

checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
print("msb=", msb, " lsb=", lsb, " crc=", crc)

fdw.close()
fdr.close()

Page 187

import subprocess
import io
import fcntl
from time import sleep


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],  stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],  stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write(bytearray([0xF3]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")

fdw.write(bytearray([0xF5]))
while(True):
    try:
        data = fdr.read(3)
        break
    except:
        sleep(0.01)

msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536

print("humidity=", hum, "%")

print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()
 

 

Page 197

import subprocess
import io
import fcntl
from time import sleep

def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout = subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c=output.rfind("i2c_arm")
    if lasti2c!=-1:
        lasti2c=output.find("i2c_arm=on",lasti2c)
    if lasti2c==-1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],  stdout = subprocess.PIPE)
        output = str(temp.communicate())
    return

I2C_SLAVE=0x0703

fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)

fdw.write( bytearray([0xE3]))

data=fdr.read(3)
msb=data[0]
lsb=data[1]
crc=data[2]
print("msb=",msb," lsb=",lsb," crc=",crc)

fdw.close()
fdr.close()

Page 200

import subprocess
from ctypes import *
import os
import fcntl
import io


def checkI2CBus():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"],  stdout=subprocess.PIPE)
    output = str(temp.communicate())
    lasti2c = output.rfind("i2c_arm")
    if lasti2c != -1:
        lasti2c = output.find("i2c_arm=on", lasti2c)
    if lasti2c == -1:
        temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
    return


checkI2CBus()
I2C_SLAVE = 0x0703
I2C_FUNCS = 0x0705

I2C_FUNC_I2C = 0x00000001
I2C_FUNC_10BIT_ADDR = 0x00000002
I2C_FUNC_PROTOCOL_MANGLING = 0x00000004
I2C_FUNC_NOSTART = 0x00000010
I2C_FUNC_SLAVE = 0x00000020

i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
support = c_uint32(0)
fcntl.ioctl(i2cfd, I2C_FUNCS, support)
support = support.value
if support & I2C_FUNC_I2C:
    print("I2C Support")
if support & I2C_FUNC_10BIT_ADDR:
    print("10 bit address Support")
if support & I2C_FUNC_PROTOCOL_MANGLING:
    print("I2C Mangling Support")
if support & I2C_FUNC_NOSTART:
    print("I2C Nostart Support")
if support & I2C_FUNC_SLAVE:
    print("I2C Slave Support")

Page 204

import fcntl
import os
from ctypes import *


class Msgs(Structure):
    _fields_ = [("addr", c_uint16),
                ("flags", c_uint16),
                ("len", c_uint16),
                ("buf", POINTER(c_uint8))
                ]


class MsgSet(Structure):
    _fields_ = [("msgs", POINTER(Msgs)),
                ("nmsgs", c_uint32)
                ]


def i2cReadRegister(i2cfd, slaveaddr, reg, n):
    I2C_RDWR = 0x0707
    I2C_M_RD = 0x0001

    msgs = (Msgs*2)()

    msgs[0].addr = c_uint16(slaveaddr)
    msgs[0].flags = c_uint16(0)
    msgs[0].len = c_uint16(1)
    msgs[0].buf = POINTER(c_uint8)(c_uint8(reg))

    msgs[1].addr = c_uint16(slaveaddr)
    msgs[1].flags = c_uint16(I2C_M_RD)
    msgs[1].len = c_uint16(n)
    buf = (c_uint8*n)(0)
    msgs[1].buf = POINTER(c_uint8)(buf)

    msgset = MsgSet()
    msgset.msgs = POINTER(Msgs)(msgs)
    msgset.nmsgs = c_uint32(2)

    libc = CDLL("libc.so.6")
    libc.ioctl(i2cfd, I2C_RDWR, byref(msgset))

    return msgs[1].buf


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if(data32 & 1 << (23 - i)):
            data32 ^= divisor
        divisor >>= 1
    return data32


i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
data = i2cReadRegister(i2cfd, 0x40, 0xE3, 3)
msb = data[0]
lsb = data[1]
crc = data[2]

data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(hex(msb), hex(lsb), hex(crc))
print(crcCheck(msb, lsb, crc))
 

 

Page 212

import subprocess
import io


def checkLM75():
    indicator1 = "i2c_arm=on"
    command1 = ["sudo", "dtparam", "i2c_arm=on"]
    indicator2 = "lm75"
    command2 = ["sudo", "dtoverlay", "i2c-sensor", "lm75", "addr=0x48"]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator1) == -1:
        temp = subprocess.Popen(command1, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)
    if output.find(indicator2) == -1:
        temp = subprocess.Popen(command2, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


checkLM75()

fdr = io.open("/sys/class/hwmon/hwmon2/temp1_input", "r")
buf = fdr.read()
print(buf)
temp = int(buf) / 1000
print(temp)
fdr.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max", "w")
fdw.write("19000")
fdw.close()

fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max_hyst", "w")
fdw.write("18000")
fdw.close()

Page 215

import subprocess
import io

def loadHtu21():
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output)
    if output.find("htu21") == -1:
        temp = subprocess.Popen( ["sudo", "dtoverlay", "i2c-sensor", "htu21"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output)
    return

loadHtu21()

fdrt = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input", "r")
print(int(fdrt.read())/1000)
fdrh = io.open( "/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "r")
print(int(fdrh.read())/1000)
 

 

Page 229

import subprocess
import io


def load1w(pin):
    indicator = "w1-gpio"
    command = ["sudo", "dtoverlay", "w1-gpio", "gpiopin="+str(pin)]
    temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print(output, flush=True)
    if output.find(indicator) == -1:
        temp = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print(output, flush=True)


def getDevices():
    fdr = io.open(
        "/sys/bus/w1/drivers/w1_master_driver/w1_bus_master1/w1_master_slaves", "r")
    buffer = fdr.read()
    fdr.close()
    return buffer.split()


def getData(dev, name):
    fdr = io.open("/sys/bus/w1/devices/"+dev+"/"+name, "r")
    data = fdr.read()
    fdr.close()
    return data


load1w(4)
devs = getDevices()
name = getData(devs[0], "name")
print("Name ", name)
resolution = getData(devs[0], "resolution")
print("Resolution ", resolution)
w1_slave = getData(devs[0], "w1_slave")
print("w1_slave ", w1_slave)
temperature = getData(devs[0], "temperature")
print("temperature ", temperature)
temperature = int(temperature) / 1000
print("temperature ", temperature, "C")
alarms = getData(devs[0], "alarms")
print("Alarms ", alarms)

Page 240

from ctypes import *
from socket import *
from struct import *
import os
import subprocess
import io
from time import sleep


NLMSG_DONE = 0x3

CN_W1_IDX = 0x3
CN_W1_VAL = 0x1

W1_SLAVE_ADD = 0
W1_SLAVE_REMOVE = 1
W1_MASTER_ADD = 2
W1_MASTER_REMOVE = 3
W1_MASTER_CMD = 4
W1_SLAVE_CMD = 5
W1_LIST_MASTERS = 6

W1_CMD_READ = 0
W1_CMD_WRITE = 1
W1_CMD_SEARCH = 2
W1_CMD_ALARM_SEARCH = 3
W1_CMD_TOUCH = 4
W1_CMD_RESET = 5
W1_CMD_SLAVE_ADD = 6
W1_CMD_SLAVE_REMOVE = 7
W1_CMD_LIST_SLAVES = 8
W1_CMD_MAX = 9

NETLINK_CONNECTOR = 11

nl_seq = 0


class nlmsghdr(Structure):
    _fields_ = [("nlmsg_len", c_uint32),
                ("nlmsg_type", c_uint16),
                ("nlmsg_flags", c_uint16),
                ("nlmsg_seq", c_uint32),
                ("nlmsg_pid", c_uint32)
                ]


cnh = nlmsghdr()
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cnh.nlmsg_pid = c_uint32(os.getpid())
cnh.nlmsg_type = c_uint16(NLMSG_DONE)
cnh.nlmsg_flags = c_uint16(0)


class cn_msg(Structure):
    _fields_ = [("idx", c_uint32),
                ("val", c_uint32),
                ("seq", c_uint32),
                ("ack", c_uint32),
                ("len", c_uint16),
                ("flags", c_uint16),
                ]


cmsg = cn_msg()
cmsg.idx = c_uint32(CN_W1_IDX)
cmsg.val = c_uint32(CN_W1_VAL)
cmsg.seq = c_uint32(cnh.nlmsg_seq)
cmsg.ack = c_uint32(0)


class w1_netlink_msg(Structure):
    _fields_ = [("type", c_uint8),
                ("status", c_uint8),
                ("len", c_uint16),
                ("id", c_uint8*8),
                ]


msg = w1_netlink_msg()
msg.type = c_uint8(W1_MASTER_CMD)
msg.id = (c_uint8*8).from_buffer_copy(c_uint64(1))


class w1_netlink_cmd(Structure):
    _fields_ = [("cmd", c_uint8),
                ("res", c_uint8),
                ("len", c_uint16),
                ]


w1_cmd = w1_netlink_cmd()
w1_cmd.cmd = c_uint8(W1_CMD_SEARCH)
w1_cmd.len = c_uint16(0)


msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)


s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR)
s.connect((0, AF_NETLINK))

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("Bytes received", len(buffer2), flush=True)

p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)

num = int(w1_cmdr.len/8)
print("number of slaves", num)

p = p+len(bytes(w1_cmdr))
slaves = []
for i in range(num):
    slaves.append(c_uint64.from_buffer_copy(buffer2, p+i*8).value)
print([hex(slave) for slave in slaves])

# GET ACK
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

# READ TEMPERATURE
# SET UP SLAVE COMMAND
print()
print("Read Temperature")

msg.type = W1_SLAVE_CMD

id = slaves[0].to_bytes(8, "little")
msg.id = (c_uint8*8).from_buffer_copy(id)


# SETUP AND SEND WRITE 0x44 = CONVERT
w1_cmd.cmd = c_uint8(W1_CMD_WRITE)
w1_cmd_data = c_uint8(0x44)


cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cmsg.seq = c_uint32(cnh.nlmsg_seq)

w1_cmd.len = c_uint16(1)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg) + \
    bytearray(w1_cmd)+bytearray(w1_cmd_data)

n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
sleep(1)

# READ SCRATCH PAD
cnh.nlmsg_seq = nl_seq
nl_seq += 1
cmsg.seq = cnh.nlmsg_seq

w1_cmd.cmd = W1_CMD_WRITE
w1_cmd_data = c_uint8(0xBE)

w1_cmd2 = w1_netlink_cmd()
w1_cmd2.cmd = c_uint8(W1_CMD_READ)


w1_cmd.len = c_uint16(1)
w1_cmd2.len = c_uint16(9)
msg.len = c_uint16(len(bytearray(w1_cmd)) +
                   len(bytearray(w1_cmd2)) + w1_cmd.len+w1_cmd2.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)

w1_cmd2_data = (c_uint8*9)(0)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd) + \
    bytearray(w1_cmd_data)+bytearray(w1_cmd2)+bytearray(w1_cmd2_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)

buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)

buffer2 = s.recv(65535)
print("length of data received ", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
# print(cn_cnhr.nlmsg_len)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
p = p+len(bytes(w1_cmdr))

temp = c_uint16.from_buffer_copy(buffer2, p)

print("number of bytes found ", w1_cmdr.len, flush=True)
print(temp.value/16)
 
 

 

VSCODE TASKS

 

settings.json

which has to be customized to your IP, user name and folder:

{
    "sshUser": "pi",
    "sshEndpoint": "192.168.11.151",
    "remoteDirectory": "/home/pi/Documents/",
}

tasks.json

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "copyToRemote",
            "type": "shell",
            "command": "scp -r ${fileDirname} ${config:sshUser}@${config:sshEndpoint}:${config:remoteDirectory}/",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
                "clear": true
            }
        },
        {
            "label": "makeRemoteWorkSpace",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'mkdir  ${config:remoteDirectory}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunR",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'python3 ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunRemote",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "RunR"
            ],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            },
        },
        {
            "label": "StopREmotePython",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} 'pkill python3'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": true,
            }
        },
        {
            "label": "wait",
            "type": "shell",
            "command": "timeout 10"
        },
        {
            "label": "tunnel",
            "type": "shell",
            "command": "ssh -2 -L 5678:localhost:5678  ${config:sshUser}@${config:sshEndpoint}",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "startDebug",
            "type": "shell",
            "command": "ssh -2 ${config:sshUser}@${config:sshEndpoint} 'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "copyAndDebug",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "startDebug",
                "wait"
            ],
            "presentation": {
                "showReuseMessage": false,
            },
        },
    ]
}
launch.json
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Remote Attach",
            "type": "python",
            "request": "attach",
            "connect": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}/${relativeFileDirname}/",
                    "remoteRoot": "${config:remoteDirectory}/${relativeFileDirname}"
                }
            ],
            "preLaunchTask": "copyAndDebug",
            "postDebugTask": "StopREmotePython"
        }
    ]
}