Programming the Raspberry Pi Pico in C

pico360

 

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project. 

The only downside is that you have to create a project to paste the code into.

To do this follow the instructions in the book.

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 28

#include "pico/stdlib.h"

int main() {
    gpio_init(25);
    gpio_set_dir(25, GPIO_OUT);
    while (true) {
        gpio_put(25, 1);
        sleep_ms(500);
        gpio_put(25, 0);
        sleep_ms(500);
    }
}

Page 29

cmake_minimum_required(VERSION 3.13)
include(pico_sdk_import.cmake)
project(blinky C CXX ASM)
set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
pico_sdk_init()
add_executable(blinky
 blinky.c
)
target_link_libraries(blinky pico_stdlib)
pico_add_extra_outputs(blinky)

Page 34

{
"version": "0.2.0",
"configurations": [
{
"name": "Cortex Debug",
"cwd": "${workspaceRoot}",
"executable": "${command:cmake.launchTargetPath}",
"request": "launch",
"type": "cortex-debug",
"servertype": "openocd",
"gdbPath": "gdb-multiarch",
"device": "RP2040",
"configFiles": [
"interface/raspberrypi-swd.cfg",
"target/rp2040.cfg"
],
"svdFile": "${env:PICO_SDK_PATH}/src/rp2040/hardware_regs/rp2040.svd",
"runToMain": true,
// Give restart the same functionality as runToMain
"postRestartCommands": [
"break main",
"continue"
]
}
]
}

Page 38

{
    "version": "0.2.0",
    "configurations": [        

        {
            "name": "Pico Debug",
            "cwd": "${workspaceRoot}",
            "executable": "${command:cmake.launchTargetPath}",
            "request": "launch",
            "type": "cortex-debug",
            "servertype": "openocd",
            "gdbPath": "arm-none-eabi-gdb",
            "device": "RP2040",
            "configFiles": [
                "interface/picoprobe.cfg",
                "target/rp2040.cfg"
            ],
            "svdFile": "${env:PICO_SDK_PATH}/src/rp2040/hardware_regs/rp2040.svd",
            "runToMain": true,
            // Work around for stopping at main on restart
            "postRestartCommands": [
                "break main",
                "continue"
            ]
        }
    ]
}

Page 46

#include "pico/stdlib.h"

int main()
{
    gpio_init(22);
    gpio_set_dir(22, true);
    while (true)
    {
        gpio_put(22, 1);
        sleep_ms(1000);
        gpio_put(22, 0);
        sleep_ms(1000);
    }
}

Page 81

#include "pico/stdlib.h"
int main()
{
    gpio_set_function(21, GPIO_FUNC_SIO);
    gpio_set_dir(21, false);
    gpio_pull_up(21);

    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, true);

    while (true)
    {
        if (gpio_get(21))
        {
            gpio_put(22, 0);
        }
        else
        {
            gpio_put(22, 1);
        }
    }
}

Page 83

#include "pico/stdlib.h"
int main()
{
    gpio_set_function(21, GPIO_FUNC_SIO);
    gpio_set_dir(21, false);
    gpio_pull_down(21);

    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, true);
    gpio_put(22, 0);
    while (true)
    {
        while (gpio_get(21) == 0)
        {
        };
        while (gpio_get(21) == 1)
        {
        };
        gpio_put(22, 1);
        sleep_ms(1000);
        gpio_put(22, 0);
    }
}

Page 84

#include "pico/stdlib.h"
int main()
{
    gpio_set_function(21, GPIO_FUNC_SIO);
    gpio_set_dir(21, false);
    gpio_pull_down(21);

    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, true);
    gpio_put(22, 0);

    uint64_t t;

    while (true)
    {
        while (gpio_get(21) == 0)
        {
        };
        t = time_us_64();
        sleep_ms(1);
        while (gpio_get(21) == 1)
        {
        };
        t = (uint64_t)(time_us_64() - t);
        if (t < 2000 * 1000)
        {
            gpio_put(22, 1);
            sleep_ms(1000);
            gpio_put(22, 0);
        }
        else
        {
            for (int i = 0; i < 10; i++)
            {
                gpio_put(22, 1);
                sleep_ms(100);
                gpio_put(22, 0);
                sleep_ms(100);
            }
        }
    }
}

Page 86

#include <stdio.h>
#include "pico/stdlib.h"

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    uint64_t t;

    while (true)
    {
        while (gpio_get(22) == 1)
        {
        };
        while (gpio_get(22) == 0)
        {
        };
        t = time_us_64();
        while (gpio_get(22) == 1)
        {
        };
        t = (uint64_t)(time_us_64() - t);
        printf("%llu\n", t);
        sleep_ms(1000);
    }
}

Page 89

#include <stdio.h>
#include "pico/stdlib.h"

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    uint64_t t;
    int s = 0;
    int i;
    int count = 0;
    while (true)
    {
        i = gpio_get(22);
        t = time_us_64() + 100;
        switch (s)
        {
        case 0: //button not pushed
            if (i)
            {
                s = 1;
                count++;
                printf("Button Push %d \n\r", count);
            }
            break;
        case 1: //Button pushed
            if (!i)
            {
                s = 0;
            }
            break;
        default:
            s = 0;
        }
        sleep_until((absolute_time_t){t});
    }
}

Page 91

#include <stdio.h>
#include "pico/stdlib.h"
int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    uint64_t t;
    uint64_t tpush, twait;
    int s = 0, i;
    int count = 0;
    while (true)
    {
        i = gpio_get(22);
        t = time_us_64();
        switch (s)
        {
        case 0: //button not pushed
            if (i)
            {
                s = 1;
                tpush = t;
            }
            break;
        case 1: //Button pushed
            if (!i)
            {
                s = 0;
                if ((t - tpush) > 2000000)
                {
                    printf("Button held \n\r");
                }
                else
                {
                    printf("Button pushed \n\r");
                }
                fflush(stdout);
            }
            break;
        default:
            s = 0;
        }
        sleep_until((absolute_time_t){t + 1000});
    }
}

Page 93

#include <stdio.h>
#include "pico/stdlib.h"

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    gpio_set_function(21, GPIO_FUNC_SIO);
    gpio_set_dir(21, true);
    gpio_set_function(20, GPIO_FUNC_SIO);
    gpio_set_dir(20, true);
    gpio_set_function(19, GPIO_FUNC_SIO);
    gpio_set_dir(19, true);

    gpio_put(19, 1);
    gpio_put(20, 0);
    gpio_put(21, 0);

    uint64_t t, tpush, twait;
    int s = 0;
    int buttonState = gpio_get(22);
    int edge;
    int buttonNow;

    while (true)
    {
        t = time_us_64();
        buttonNow = gpio_get(22);
        edge = buttonState - buttonNow;
        buttonState = buttonNow;
        switch (s)
        {
        case 0:
            if (edge == 1)
            {
                s = 1;
                gpio_put(19, 0);
                gpio_put(20, 1);
                gpio_put(21, 0);
            }
            break;
        case 1:
            if (edge == 1)
            {
                s = 2;
                gpio_put(19, 0);
                gpio_put(20, 0);
                gpio_put(21, 1);
            }
            break;
        case 2:
            if (edge == 1)
            {
                s = 0;
                gpio_put(19, 1);
                gpio_put(20, 0);
                gpio_put(21, 0);
            }
            break;

        default:
            s = 0;
        }
        sleep_until((absolute_time_t){t + 1000});
    }
}

Page 100

uint32_t gpio_get_events(uint gpio)
{
    int32_t mask = 0xF << 4 * (gpio % 8);
    return (iobank0_hw->intr[gpio / 8] & mask) >> 4 * (gpio % 8);
}
void gpio_clear_events(uint gpio, uint32_t events)
{
    gpio_acknowledge_irq(gpio, events);
}

Page 101

#include <stdio.h>
#include "pico/stdlib.h"

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    gpio_pull_down(22);

    printf("Press Button\n");
    sleep_ms(20000);
    if (gpio_get(22))
    {
        printf("Button Pressed\n");
    }
    else
    {
        printf("Button Not Pressed\n");
    }
}

Page 101

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/structs/iobank0.h"

uint32_t gpio_get_events(uint gpio)
{
    int32_t mask = 0xF << 4 * (gpio % 8);
    return (iobank0_hw->intr[gpio / 8] & mask) >> 4 * (gpio % 8);
}
void gpio_clear_events(uint gpio, uint32_t events)
{
    gpio_acknowledge_irq(gpio, events);
}

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    gpio_pull_down(22);

    printf("Press Button\n");
    gpio_clear_events(22, GPIO_IRQ_EDGE_RISE);
    sleep_ms(20000);
    int32_t event = gpio_get_events(22);
    gpio_clear_events(22, GPIO_IRQ_EDGE_RISE);
    if (event & GPIO_IRQ_EDGE_RISE)
    {
        printf("Button Pressed\n");
    }
    else
    {
        printf("Button Not Pressed\n");
    }
}

Page 103

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/structs/iobank0.h"

uint32_t gpio_get_events(uint gpio)
{
    int32_t mask = 0xF << 4 * (gpio % 8);
    return (iobank0_hw->intr[gpio / 8] & mask) >> 4 * (gpio % 8);
}
void gpio_clear_events(uint gpio, uint32_t events)
{
    gpio_acknowledge_irq(gpio, events);
}

int main()
{
    uint64_t t;

    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    gpio_pull_down(22);

    while (true)
    {
        gpio_clear_events(22, GPIO_IRQ_EDGE_RISE | GPIO_IRQ_EDGE_FALL);
        while (!(gpio_get_events(22) & GPIO_IRQ_EDGE_RISE))
        {
        };
        t = time_us_64();
        gpio_clear_events(22, GPIO_IRQ_EDGE_RISE | GPIO_IRQ_EDGE_FALL);
        while (!(gpio_get_events(22) & GPIO_IRQ_EDGE_FALL))
        {
        };
        t = (uint64_t)(time_us_64() - t);
        printf("%llu\n", t);
        sleep_ms(1000);
    }
}

Page 105

#include <stdio.h>
#include "pico/stdlib.h"

static uint64_t t;
void MyIRQHandler(uint gpio, uint32_t events)
{
    t = time_us_64() - t;
    printf("GPIO %d %X %d \n", gpio, events, t);
}

int main()
{
    stdio_init_all();
    gpio_set_function(22, GPIO_FUNC_SIO);
    gpio_set_dir(22, false);
    gpio_pull_down(22);

    gpio_set_irq_enabled_with_callback(22, GPIO_IRQ_EDGE_RISE | GPIO_IRQ_EDGE_FALL, true, &MyIRQHandler);
    while (1)
    {
    };
    return 0;
}

Page 122  remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;

    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

int main()
{
    gpio_set_function(22, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(22);
    uint chan = pwm_gpio_to_channel(22);
    pwm_set_freq_duty(slice_num, chan, 50, 75);
    pwm_set_enabled(slice_num, true);
    return 0;
}

Page 124 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num,uint chan,
                                         uint32_t f, int d)
{
 uint32_t clock = 125000000;
 uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
 if (divider16 / 16 == 0)
 divider16 = 16;
 uint32_t wrap = clock * 16 / divider16 / f - 1;
 pwm_set_clkdiv_int_frac(slice_num, divider16/16, divider16 & 0xF);
 pwm_set_wrap(slice_num, wrap);
 pwm_set_chan_level(slice_num, chan, wrap * d / 100);
 return wrap;
}
uint32_t pwm_get_wrap(uint slice_num){
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(20, GPIO_FUNC_PWM);
    gpio_set_function(21, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(20);
    uint chan20 = pwm_gpio_to_channel(20);
    uint chan21 = pwm_gpio_to_channel(21);
    uint wrap = pwm_set_freq_duty(slice_num, chan20, 50, 75);
    pwm_set_duty(slice_num, chan21, 25);

    pwm_set_enabled(slice_num, true);

    return 0;
}

Page 126 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan,
                           uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}
uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(20, GPIO_FUNC_PWM);

    uint slice_num = pwm_gpio_to_slice_num(20);
    uint chan20 = pwm_gpio_to_channel(20);

    uint wrap = pwm_set_freq_duty(slice_num, chan20, 50, 50);

    pwm_set_enabled(slice_num, true);
    while (true)
    {
        pwm_set_duty(slice_num, chan20, 25);
        pwm_set_duty(slice_num, chan20, 50);
    }
    return 0;
}

Page 122 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan,
                           uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}
uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(20, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(20);
    uint chan20 = pwm_gpio_to_channel(20);
    uint wrap = pwm_set_freq_duty(slice_num, chan20, 50, 50);
    pwm_set_enabled(slice_num, true);
    while (true)
    {
        pwm_set_duty(slice_num, chan20, 25);
        while (pwm_get_counter(slice_num))
        {
        };
        pwm_set_duty(slice_num, chan20, 50);
        while (pwm_get_counter(slice_num))
        {
        };
    }
    return 0;
}

Page 129  remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"
#include "hardware/irq.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}
uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

uint slice_num;
uint chan20;
uint state = 0;

void MyIRQHandler()
{
    pwm_clear_irq(slice_num);
    if (state)
    {
        pwm_set_duty(slice_num, chan20, 25);
    }
    else
    {
        pwm_set_duty(slice_num, chan20, 50);
    }
    state = ~state;
}

int main()
{
    gpio_set_function(20, GPIO_FUNC_PWM);

    slice_num = pwm_gpio_to_slice_num(20);
    chan20 = pwm_gpio_to_channel(20);

    pwm_clear_irq(slice_num);
    pwm_set_irq_enabled(slice_num, true);
    irq_set_exclusive_handler(PWM_IRQ_WRAP, MyIRQHandler);
    irq_set_enabled(PWM_IRQ_WRAP, true);

    uint wrap = pwm_set_freq_duty(slice_num, chan20, 100000, 25);
    pwm_set_enabled(slice_num, true);
    while (true)
    {
    }
    return 0;
}

Page 131  remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"
#include "hardware/irq.h"
#include "math.h"

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}
uint slice_num;
uint chan20;
uint state = 0;
uint8_t wave[256];

void MyIRQHandler()
{
    pwm_clear_irq(slice_num);
    pwm_set_duty(slice_num, chan20, wave[state]);
    state = (state + 1) % 256;
}

int main()
{

    for (int i = 0; i < 256; i++)
    {
        wave[i] = (uint8_t)((128.0 + sinf((float)i * 2.0 * 3.14159 / 256.0) * 128.0) * 100.0 / 256.0);
    }

    gpio_set_function(22, GPIO_FUNC_PWM);

    slice_num = pwm_gpio_to_slice_num(22);
    uint chan22 = pwm_gpio_to_channel(22);

    pwm_clear_irq(slice_num);
    pwm_set_irq_enabled(slice_num, true);
    irq_set_exclusive_handler(PWM_IRQ_WRAP, MyIRQHandler);
    irq_set_enabled(PWM_IRQ_WRAP, true);

    pwm_set_clkdiv_int_frac(slice_num, 1, 0);
    pwm_set_wrap(slice_num, 255);

    pwm_set_enabled(slice_num, true);
    while (true)
    {
    }
    return 0;
}

Page 134 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(22, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(22);

    uint chan22 = pwm_gpio_to_channel(22);
    pwm_set_freq_duty(slice_num, chan22, 281, 50);

    pwm_set_enabled(slice_num, true);
}

Page 134 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(25, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(25);
    uint chan = pwm_gpio_to_channel(25);
    pwm_set_freq_duty(slice_num, chan, 2000, 0);
    pwm_set_enabled(slice_num, true);

    while (true)
    {
        for (int d = 0; d <= 100; d++)
        {
            pwm_set_duty(slice_num, chan, d);
            sleep_ms(50);
        }
    }
}

Page 136 remember to add hardware_pwm to the CMakeLists.txt file

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

int main()
{
    gpio_set_function(25, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(25);
    uint chan = pwm_gpio_to_channel(25);

    pwm_set_freq_duty(slice_num, chan, 2000, 0);

    pwm_set_enabled(slice_num, true);
    int d = 0;
    while (true)
    {
        for (int b = 0; b <= 100; b++)
        {
            d = (b * b * b) / 10000;
            pwm_set_duty(slice_num, chan, d);
            sleep_ms(50);
        }
    }
}

Page 138 remember to add hardware_pwm to the CMakeLists.txt file 

#include "pico/stdlib.h"
#include "hardware/pwm.h"

int main()
{
    gpio_set_function(20, GPIO_FUNC_PWM);
    gpio_set_function(21, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(20);

    uint chanA = pwm_gpio_to_channel(20);
    uint chanB = pwm_gpio_to_channel(21);
    pwm_set_clkdiv_int_frac(slice_num, 2, 0);
    pwm_set_wrap(slice_num, 127);
    pwm_set_chan_level(slice_num, chanA, 63);
    pwm_set_clkdiv_mode(slice_num, PWM_DIV_B_RISING);
    pwm_set_enabled(slice_num, true);
}

Page 140 remember to add hardware_pwm to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/pwm.h"
int main()
{
    stdio_init_all();
    gpio_set_function(20, GPIO_FUNC_PWM);
    gpio_set_function(21, GPIO_FUNC_PWM);
    uint slice_num = pwm_gpio_to_slice_num(20);

    uint chanA = pwm_gpio_to_channel(20);
    uint chanB = pwm_gpio_to_channel(21);

    pwm_set_clkdiv_int_frac(slice_num, 20, 0);
    int maxcount = 125000000 * 10 / 20 / 1000;

    pwm_set_wrap(slice_num, 65535);
    pwm_set_chan_level(slice_num, chanA, 100);
    pwm_set_clkdiv_mode(slice_num, PWM_DIV_B_HIGH);

    while (true)
    {
        pwm_set_enabled(slice_num, true);
        sleep_ms(10);
        pwm_set_enabled(slice_num, false);
        uint16_t count = pwm_get_counter(slice_num);
        pwm_set_counter(slice_num, 0);
        printf("count= %u  duty cycle=%d %%\n",
               count, (int)count * 100 / maxcount);
        sleep_ms(1000);
    }
}

Page 29 remember to add hardware_pwm to the CMakeLists.txt file 

#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

typedef struct
{
    uint gpio;
    uint slice;
    uint chan;
    uint speed;
    uint freq;
    uint resolution;
    bool on;
} Motor;

void motorInit(Motor *m, uint gpio, uint freq)
{
    gpio_set_function(gpio, GPIO_FUNC_PWM);
    m->gpio = gpio;
    m->slice = pwm_gpio_to_slice_num(gpio);
    m->chan = pwm_gpio_to_channel(gpio);
    m->freq = freq;
    m->speed = 0;
    m->resolution = pwm_set_freq_duty(m->slice, m->chan, m->freq, m->speed);
    m->on = false;
}

void motorspeed(Motor *m, int s)
{
    pwm_set_duty(m->slice, m->chan, s);
    m->speed = s;
}
void motorOn(Motor *m)
{
    pwm_set_enabled(m->slice, true);
    m->on = true;
}

void motorOff(Motor *m)
{
    pwm_set_enabled(m->slice, false);
    m->on = false;
}

int main()
{
    Motor mot1;
    motorInit(&mot1, 21, 2000);
    motorspeed(&mot1, 50);
    motorOn(&mot1);
    return 0;
}

 

Page 157 remember to add hardware_pwm to the CMakeLists.txt file 

 
#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

typedef struct
{
    uint gpioForward;
    uint gpioBackward;
    uint slice;
    uint Fchan;
    uint Bchan;
    bool forward;
    uint speed;
    uint freq;
    uint resolution;
    bool on;
} BiMotor;

void BiMotorInit(BiMotor *m, uint gpioForward, uint gpioBackward, uint freq)
{
    gpio_set_function(gpioForward, GPIO_FUNC_PWM);
    m->gpioForward = gpioForward;
    m->slice = pwm_gpio_to_slice_num(gpioForward);
    m->Fchan = pwm_gpio_to_channel(gpioForward);

    gpio_set_function(gpioBackward, GPIO_FUNC_PWM);
    m->gpioBackward = gpioBackward;
    m->Bchan = pwm_gpio_to_channel(gpioBackward);

    m->freq = freq;
    m->speed = 0;
    m->forward = true;
    m->resolution = pwm_set_freq_duty(m->slice, m->Fchan, m->freq, 0);
    pwm_set_duty(m->slice, m->Bchan, 0);
    m->on = false;
}

void BiMotorspeed(BiMotor *m, int s, bool forward)
{
    if (forward)
    {
        pwm_set_duty(m->slice, m->Bchan, 0);
        pwm_set_duty(m->slice, m->Fchan, s);
        m->forward = true;
    }
    else
    {
        pwm_set_duty(m->slice, m->Fchan, 0);
        pwm_set_duty(m->slice, m->Bchan, s);
        m->forward = true;
    }
    m->speed = s;
}

void BiMotorOn(BiMotor *m)
{
    pwm_set_enabled(m->slice, true);
    m->on = true;
}

void BiMotorOff(BiMotor *m)
{
    pwm_set_enabled(m->slice, false);
    m->on = false;
}

int main()
{
    BiMotor mot1;
    BiMotorInit(&mot1, 20, 21, 2000);

    BiMotorOn(&mot1);
    while (true)
    {
        BiMotorspeed(&mot1, 50, true);
        sleep_ms(2000);
        BiMotorspeed(&mot1, 25, false);
        sleep_ms(2000);
    }

    return 0;
}

Page 162 remember to add hardware_pwm to the CMakeLists.txt file 

 
#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/pwm.h"

uint32_t pwm_set_freq_duty(uint slice_num, uint chan, uint32_t f, int d)
{
    uint32_t clock = 125000000;
    uint32_t divider16 = clock / f / 4096 + (clock % (f * 4096) != 0);
    if (divider16 / 16 == 0)
        divider16 = 16;
    uint32_t wrap = clock * 16 / divider16 / f - 1;
    pwm_set_clkdiv_int_frac(slice_num, divider16 / 16, divider16 & 0xF);
    pwm_set_wrap(slice_num, wrap);
    pwm_set_chan_level(slice_num, chan, wrap * d / 100);
    return wrap;
}

uint32_t pwm_get_wrap(uint slice_num)
{
    valid_params_if(PWM, slice_num >= 0 && slice_num < NUM_PWM_SLICES);
    return pwm_hw->slice[slice_num].top;
}

void pwm_set_duty(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 100);
}

void pwm_set_dutyH(uint slice_num, uint chan, int d)
{
    pwm_set_chan_level(slice_num, chan, pwm_get_wrap(slice_num) * d / 10000);
}

typedef struct
{
    uint gpio;
    uint slice;
    uint chan;
    uint speed;
    uint resolution;
    bool on;
    bool invert;
} Servo;

void ServoInit(Servo *s, uint gpio, bool invert)
{
    gpio_set_function(gpio, GPIO_FUNC_PWM);
    s->gpio = gpio;
    s->slice = pwm_gpio_to_slice_num(gpio);
    s->chan = pwm_gpio_to_channel(gpio);

    pwm_set_enabled(s->slice, false);
    s->on = false;
    s->speed = 0;
    s->resolution = pwm_set_freq_duty(s->slice, s->chan, 50, 0);
    pwm_set_dutyH(s->slice, s->chan, 250);
    if (s->chan)
    {
        pwm_set_output_polarity(s->slice, false, invert);
    }
    else
    {
        pwm_set_output_polarity(s->slice, invert, false);
    }
    s->invert = invert;
}

void ServoOn(Servo *s)
{
    pwm_set_enabled(s->slice, true);
    s->on = true;
}

void ServoOff(Servo *s)
{
    pwm_set_enabled(s->slice, false);
    s->on = false;
}
void ServoPosition(Servo *s, uint p)
{
    pwm_set_dutyH(s->slice, s->chan, p * 10 + 250);
}

int main()
{
    Servo s1;
    ServoInit(&s1, 20, false);

    ServoOn(&s1);
    while (true)
    {
        ServoPosition(&s1, 0);
        sleep_ms(500);
        ServoPosition(&s1, 100);
        sleep_ms(500);
    }

    return 0;
}

Page 173 remember to add hardware_pwm to the CMakeLists.txt file 

 
#include <stdio.h>
#include "pico/stdlib.h"

typedef struct
{
    uint gpio;
    uint speed;
    bool forward;
    uint32_t gpiomask;
    uint phase;
} StepperBi;

uint32_t stepTable[8] = (uint32_t[8]){0x8, 0xC, 0x4, 0x6, 0x2, 0x3, 0x1, 0x9};
/*
    {1, 0, 0, 0},
    {1, 1, 0, 0},
    {0, 1, 0, 0},
    {0, 1, 1, 0},
    {0, 0, 1, 0},
    {0, 0, 1, 1},
    {0, 0, 0, 1},
    {1, 0, 0, 1}
*/
void StepperBiInit(StepperBi *s, uint gpio)
{
    s->gpio = gpio;

    for (int i = 0; i < 4; i++)
    {
        gpio_set_function((s->gpio) + i, GPIO_FUNC_SIO);
        gpio_set_dir((s->gpio) + i, true);
    }
    s->gpiomask = 0x0F << gpio;
    volatile uint32_t mask = stepTable[0] << gpio;
    gpio_put_masked(s->gpiomask, mask);
    s->phase = 0;
    s->speed = 0;
    s->forward = true;
}

void setPhase(StepperBi *s, uint p)
{
    uint32_t mask = stepTable[p] << (s->gpio);
    gpio_put_masked(s->gpiomask, mask);
}

void stepForward(StepperBi *s)
{
    s->phase = (s->phase + 1) % 8;
    setPhase(s, s->phase);
}

void stepReverse(StepperBi *s)
{
    s->phase = (s->phase - 1) % 8;
    setPhase(s, s->phase);
}

void rotate(StepperBi *s, bool dir, int speed)
{
    s->forward = dir;
    s->speed = speed;
}

int main()
{
    stdio_init_all();
    StepperBi s1;
    StepperBiInit(&s1, 18);
    while (true)
    {
        stepForward(&s1);
        sleep_ms(1);
    }
    return 0;
}

Page 178 remember to add hardware_pwm to the CMakeLists.txt file 

 
#include <stdio.h>
#include "pico/stdlib.h"

typedef struct
{
    uint gpio;
    uint speed;
    bool forward;
    uint32_t gpiomask;
    uint phase;
} StepperBi;

uint32_t stepTable[8] = (uint32_t[8]){0x8, 0xC, 0x4, 0x6, 0x2, 0x3, 0x1, 0x9};
/*
    {1, 0, 0, 0},
    {1, 1, 0, 0},
    {0, 1, 0, 0},
    {0, 1, 1, 0},
    {0, 0, 1, 0},
    {0, 0, 1, 1},
    {0, 0, 0, 1},
    {1, 0, 0, 1}
*/
void StepperBiInit(StepperBi *s, uint gpio)
{
    s->gpio = gpio;

    for (int i = 0; i < 4; i++)
    {
        gpio_set_function((s->gpio) + i, GPIO_FUNC_SIO);
        gpio_set_dir((s->gpio) + i, true);
    }
    s->gpiomask = 0x0F << gpio;
    volatile uint32_t mask = stepTable[0] << gpio;
    gpio_put_masked(s->gpiomask, mask);
    s->phase = 0;
    s->speed = 0;
    s->forward = true;
}

void setPhase(StepperBi *s, uint p)
{
    uint32_t mask = stepTable[p] << (s->gpio);
    gpio_put_masked(s->gpiomask, mask);
}

void stepForward(StepperBi *s)
{
    s->phase = (s->phase + 1) % 8;
    setPhase(s, s->phase);
}

void stepReverse(StepperBi *s)
{
    s->phase = (s->phase - 1) % 8;
    setPhase(s, s->phase);
}

bool step(struct repeating_timer *t)
{
    StepperBi *s = (StepperBi *)(t->user_data);
    if (s->forward)
    {
        stepForward(s);
    }
    else
    {
        stepReverse(s);
    }
    return true;
}
struct repeating_timer timer;

void rotate(StepperBi *s, bool dir, int speed)
{
    cancel_repeating_timer(&timer);
    s->forward = dir;
    if (speed == 0)
    {
        s->speed = 0;
        return;
    }
    s->speed = 1000 * 1000 / (4 * speed);
    add_repeating_timer_us(s->speed, step, s, &timer);
}

int main()
{
    static StepperBi s1;
    StepperBiInit(&s1, 18);
    rotate(&s1, true, 250);
    while (true)
    {
        rotate(&s1, true, 250);
        sleep_ms(100);
        rotate(&s1, true, 00);
        sleep_ms(100);
    }
    return 0;
}

Page 189 remember to add hardware_spi to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/spi.h"

int main()
{
    stdio_init_all();
    spi_init(spi0, 500 * 1000);
    gpio_set_function(4, GPIO_FUNC_SPI);
    gpio_set_function(6, GPIO_FUNC_SPI);
    gpio_set_function(7, GPIO_FUNC_SPI);
    spi_set_format(spi0, 8, SPI_CPOL_0, SPI_CPHA_0, SPI_MSB_FIRST);
    uint16_t wBuff[1] = {0xAA};
    uint16_t rBuff[1];

    int n = spi_write16_read16_blocking(spi0, wBuff, rBuff, 1);
    spi_deinit(spi0);
    printf(" %X %X %d ", wBuff[0], rBuff[0], n);
}

Page 197 remember to add hardware_spi to the CMakeLists.txt file 

#include <stdio.h>

#include "pico/stdlib.h"
#include "hardware/spi.h"

int main()
{
    stdio_init_all();

    spi_init(spi0, 500 * 1000);
    spi_set_format(spi0, 8, SPI_CPOL_0, SPI_CPHA_0, SPI_MSB_FIRST);

    gpio_set_function(16, GPIO_FUNC_SPI);
    gpio_set_function(18, GPIO_FUNC_SPI);
    gpio_set_function(19, GPIO_FUNC_SPI);

    gpio_init(17);
    gpio_set_dir(17, GPIO_OUT);
    gpio_put(17, 1);
    sleep_ms(1);
    uint8_t wBuff[1] = {0xD0};
    uint8_t rBuff[8];

    gpio_put(17, 0);
    sleep_us(1);
    spi_write_blocking(spi0, wBuff, 1);
    spi_read_blocking(spi0, 0, rBuff, 1);
    sleep_us(1);
    gpio_put(17, 1);
    printf("Chip ID is 0x%x\n", rBuff[0]);
    gpio_put(17, 0);
    sleep_us(1);
    wBuff[0] = 0xF2;
    wBuff[1] = 0x1;
    spi_write_blocking(spi0, wBuff, 2);
    wBuff[0] = 0xF4;
    wBuff[1] = 0x27;
    spi_write_blocking(spi0, wBuff, 2);
    gpio_put(17, 1);
    sleep_us(1);

    wBuff[0] = 0xF7;
    gpio_put(17, 0);
    sleep_us(1);
    spi_write_blocking(spi0, wBuff, 1);

    spi_read_blocking(spi0, 0, rBuff, 8);
    sleep_us(1);
    gpio_put(17, 1);

    uint32_t pressure = ((uint32_t)rBuff[0] << 12) | ((uint32_t)rBuff[1] << 4) | (rBuff[2] >> 4);
    uint32_t temperature = ((uint32_t)rBuff[3] << 12) | ((uint32_t)rBuff[4] << 4) | (rBuff[5] >> 4);
    uint32_t humidity = (uint32_t)rBuff[6] << 8 | rBuff[7];

    printf("Humidity = %d\n", humidity);
    printf("Pressure = %d\n", pressure);
    printf("Temp. = %d\n", temperature);
}

Page 203 remember to add hardware_adc to the CMakeLists.txt file 

#include
#include "pico/stdlib.h"
#include "hardware/adc.h"
int main()
{
stdio_init_all();
adc_init();
adc_gpio_init(26);
adc_gpio_init(27);

while (1)
{
const float conversion_factor = 3.3f / (1 << 12);
adc_select_input(0);
uint16_t result = adc_read();
printf("Raw value 0: 0x%03x, voltage: %f V\n",
result, result * conversion_factor);
adc_select_input(1);
result = adc_read();
printf("Raw value 1: 0x%03x, voltage: %f V\n",
result, result * conversion_factor);
sleep_ms(500);
}
}

Page 205 remember to add hardware_adc to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/adc.h"

int main()
{
    stdio_init_all();

    adc_init();
    adc_gpio_init(26);
    adc_gpio_init(27);
    adc_set_round_robin(0x03);
    adc_fifo_setup(true, false, 0, false, false);
    adc_run(true);
    const float conversion_factor = 3.3f / (1 << 12);
    while (1)
    {
        uint16_t result = adc_fifo_get();
        printf("Raw value 0: 0x%03x, voltage: %f V\n", result, result * conversion_factor);
        int level = adc_fifo_get_level();
        printf("level: %d \n", level);
    }
}

Page 211 remember to add hardware_spi to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/spi.h"

int main()
{
    stdio_init_all();
    spi_init(spi0, 500 * 1000);
    spi_set_format(spi0, 8, SPI_CPOL_0, SPI_CPHA_0, SPI_MSB_FIRST);
    gpio_set_function(16, GPIO_FUNC_SPI);
    gpio_set_function(18, GPIO_FUNC_SPI);
    gpio_set_function(19, GPIO_FUNC_SPI);
    gpio_init(17);
    gpio_set_dir(17, GPIO_OUT);
    gpio_put(17, 1);
    sleep_ms(1);
    uint16_t wBuff[3] = {0x01, 0x80, 0x00};
    uint16_t rBuff[3];
    gpio_put(17, 0);
    int n = spi_write16_read16_blocking(spi0, wBuff, rBuff, 3);
    sleep_us(1);
    gpio_put(17, 1);
    int data = ((int)rBuff[1] & 0x03) << 8 | (int)rBuff[2];
    float volts = (float)data * 3.3f / 1023.0f;
    printf("%f V\n", volts);
}

Page 228 remember to add hardware_i2c to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/i2c.h"

int main()
{
    stdio_init_all();

    i2c_init(i2c0, 100 * 1000);

    gpio_set_function(4, GPIO_FUNC_I2C);
    gpio_set_function(5, GPIO_FUNC_I2C);
    uint8_t buf[] = {0xE7};

    i2c_write_blocking(i2c0, 0x40, buf, 1, false);
    i2c_read_blocking(i2c0, 0x40, buf, 1, false);
    printf("User Register = %X \r\n", buf[0]);
}

Page 230 remember to add hardware_i2c to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/i2c.h"

int main()
{
    stdio_init_all();

    i2c_init(i2c0, 100 * 1000);

    gpio_set_function(4, GPIO_FUNC_I2C);
    gpio_set_function(5, GPIO_FUNC_I2C);

    uint8_t buf[4] = {0xE3};
    i2c_write_blocking(i2c0, 0x40, buf, 1, true);
    i2c_read_blocking(i2c0, 0x40, buf, 3, false);
    uint8_t msb = buf[0];
    uint8_t lsb = buf[1];
    uint8_t check = buf[2];
    printf("msb %d \n\r lsb %d \n\r checksum %d \n\r", msb, lsb, check);
};

Page 234 remember to add hardware_i2c to the CMakeLists.txt file 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/i2c.h"
uint8_t crcCheck(uint8_t msb, uint8_t lsb, uint8_t check)
{
    uint32_t data32 = ((uint32_t)msb << 16) | ((uint32_t)lsb << 8) |
                      (uint32_t)check;
    uint32_t divisor = 0x988000;
    for (int i = 0; i < 16; i++)
    {
        if (data32 & (uint32_t)1 << (23 - i))
            data32 ^= divisor;
        divisor >>= 1;
    };
    return (uint8_t)data32;
}
int main()
{
    stdio_init_all();

    i2c_init(i2c0, 100 * 1000);

    gpio_set_function(4, GPIO_FUNC_I2C);
    gpio_set_function(5, GPIO_FUNC_I2C);

    uint8_t buf[4] = {0xE3};
    i2c_write_blocking(i2c0, 0x40, buf, 1, true);
    i2c_read_blocking(i2c0, 0x40, buf, 3, false);
    uint8_t msb = buf[0];
    uint8_t lsb = buf[1];
    uint8_t check = buf[2];
    printf("msb %d \n\r lsb %d \n\r checksum %d \n\r", msb, lsb, check);
    unsigned int data16 = ((unsigned int)msb << 8) | (unsigned int)(lsb & 0xFC);
    printf("crc = %d\n\r", crcCheck(msb, lsb, check));
    float temp = (float)(-46.85 + (175.72 * data16 / (float)65536));
    printf("Temperature %f C \n\r", temp);

    buf[0] = 0xF5;
    i2c_write_blocking(i2c0, 0x40, buf, 1, true);
    while (i2c_read_blocking(i2c0, 0x40, buf, 3, false) < 0)
    {
        sleep_ms(1);
    };

    msb = buf[0];
    lsb = buf[1];
    check = buf[2];
    printf("msb %d \n\r lsb %d \n\r checksum %d \n\r", msb, lsb, check);
    printf("crc = %d\n\r", crcCheck(msb, lsb, check));
    data16 = ((unsigned int)msb << 8) | (unsigned int)(lsb & 0xFC);
    float hum = -6 + (125.0 * (float)data16) / 65536;
    printf("Humidity %f %% \n\r", hum);
}

Page 240

.program squarewave
    set pindirs, 1   ; Set pin to output
again:
    set pins, 1      ; Drive pin high
    set pins, 0      ; Drive pin low
    jmp again        ; Set PC to label `again`

Page 241

cmake_minimum_required(VERSION 3.13)

set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)

include(pico_sdk_import.cmake)
pico_sdk_init()

project(pioBlinky C CXX ASM)
add_executable(pio
 pio.c
)
pico_generate_pio_header(pio ${CMAKE_CURRENT_LIST_DIR}/sqwave.pio)

target_link_libraries(pio  pico_stdlib hardware_pio)
pico_add_extra_outputs(pio)

Page 189 remember to use the CMakeLists.txt file and the PIO program given earlier 

#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "sqwave.pio.h"

int main()
{
    uint offset = pio_add_program(pio0, &squarewave_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = squarewave_program_get_default_config(offset);

    sm_config_set_set_pins(&c, 2, 1);
    pio_gpio_init(pio0, 2);

    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);
    return 0;
}

Page 247-248

PIO Program
.program squarewave
    set pindirs, 1  
    pull block
again:
    set pins, 1 
    mov x, osr
 loop1: 
     jmp x--,loop1
    set pins, 0 
    mov x, osr
 loop2:  
   jmp x--,loop2 
jmp again
C Program
#include "pico/stdlib.h"
#include "hardware/pio.h"

#include "sqwave.pio.h"

int main()
{

    uint offset = pio_add_program(pio0, &squarewave_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = squarewave_program_get_default_config(offset);

    sm_config_set_set_pins(&c, 2, 1);
    pio_gpio_init(pio0, 2);

    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);

    pio_sm_put_blocking(pio0, sm, 0xFFFF);
    return 0;
}

Page 249-250

PIO Program

.program squarewave

    set pindirs, 3  
    pull block    
again:
    out pins,2
    jmp again 
C Program
#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "sqwave.pio.h"

int main()
{

    uint offset = pio_add_program(pio0, &squarewave_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = squarewave_program_get_default_config(offset);
    sm_config_set_set_pins(&c, 2, 2);
    sm_config_set_out_pins(&c, 2, 2);
    pio_gpio_init(pio0, 2);
    pio_gpio_init(pio0, 3);

    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);

    pio_sm_put_blocking(pio0, sm, 0xFEDCBA98);
    return 0;
}

Page 252

PIO Program

.program squarewave   
again:
  out pins,2
  jmp again 
C Program
#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "sqwave.pio.h"

int main()
{
    uint offset = pio_add_program(pio0, &squarewave_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = squarewave_program_get_default_config(offset);
    sm_config_set_set_pins(&c, 2, 2);
    sm_config_set_out_pins(&c, 2, 2);
    pio_gpio_init(pio0, 2);
    pio_gpio_init(pio0, 3);
    pio_sm_set_consecutive_pindirs(pio0, sm, 2, 2, true);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    sm_config_set_out_shift(&c, true, true, 6);
    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);
    while (true)
    {
        pio_sm_put_blocking(pio0, sm, 0xFEDCBA98);
    }
    return 0;
}

Page  254

PIO Program
.program squarewave   
.side_set 1 opt
again:
  nop side 1
  jmp  again side 0 

C Program

#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "sqwave.pio.h"

int main()
{
    uint offset = pio_add_program(pio0, &squarewave_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = squarewave_program_get_default_config(offset);
    sm_config_set_sideset_pins(&c, 2);
    pio_gpio_init(pio0, 2);
    pio_sm_set_consecutive_pindirs(pio0, sm, 2, 1, true);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    pio_sm_init(pio0, sm, offset, &c);

    pio_sm_set_enabled(pio0, sm, true);
    return 0;
}

Page 257

PIO Program
.program light  
again:
  in pins,1
  push block
  jmp  again  

C Program

#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "light.pio.h"

int main()
{

    uint offset = pio_add_program(pio0, &light_program);

    uint sm = pio_claim_unused_sm(pio0, true);
    pio_sm_config c = light_program_get_default_config(offset);

    sm_config_set_in_pins(&c, 2);
    pio_gpio_init(pio0, 2);

    pio_sm_set_consecutive_pindirs(pio0, sm, 2, 1, false);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);

    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);

    gpio_init(25);
    gpio_set_dir(25, GPIO_OUT);
    while (true)
    {
        uint32_t flag = pio_sm_get_blocking(pio0, sm);
        if (flag == 0)
        {
            gpio_put(25, 0);
        }
        else
        {
            gpio_put(25, 1);
        }
    }

    return 0;
}

Page 259

PIO Program

.program squarewave  
again:
   wait 0 pin 0
   wait 1 pin 0
   set pins, 1  
   set pins, 0  
jmp  again
  
CProgram
#include "pico/stdlib.h"
#include "hardware/pio.h"
#include "sqwave.pio.h"

int main()
{

    uint offset = pio_add_program(pio0, &squarewave_program);
    uint sm = pio_claim_unused_sm(pio0, true);

    pio_sm_config c = squarewave_program_get_default_config(offset);
    sm_config_set_set_pins(&c, 3, 1);
    sm_config_set_in_pins(&c, 2);

    pio_gpio_init(pio0, 2);
    pio_gpio_init(pio0, 3);
    pio_sm_set_consecutive_pindirs(pio0, sm, 2, 1, false);
    pio_sm_set_consecutive_pindirs(pio0, sm, 3, 1, true);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);

    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);

    return 0;
}

Page 189

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"

int main()
{
    stdio_init_all();
    gpio_init(2);
    gpio_set_dir(2, GPIO_OUT);
    gpio_put(2, 1);
    sleep_ms(1);
    gpio_put(2, 0);
    sleep_ms(1);
    gpio_set_dir(2, GPIO_IN);
    return 0;
}

Page 267

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"

int main()
{
    stdio_init_all();
    gpio_init(2);
    gpio_set_dir(2, GPIO_OUT);
    gpio_put(2, 1);
    sleep_ms(1);
    gpio_put(2, 0);
    sleep_ms(1);
    gpio_set_dir(2, GPIO_IN);
    return 0;
}

Page 270 

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"

inline static void WaitFallingEdge(uint gpio)
{
    while (gpio_get(gpio) == 1)
    {
    };
    while (gpio_get(gpio) == 0)
    {
    };
}

uint32_t getData(uint gpio)
{
    uint32_t t2;
    uint32_t data = 0;
    uint32_t t1 = time_us_32();
    for (int i = 0; i < 32; i++)
    {
        WaitFallingEdge(2);
        t2 = time_us_32();
        data = data << 1;
        data = data | ((t2 - t1) > 100);
        t1 = t2;
    }
    return data;
}
uint8_t getCheck(uint gpio)
{
    uint8_t checksum = 0;
    uint32_t t2;
    uint32_t t1 = time_us_32();
    for (int i = 0; i < 8; i++)
    {
        WaitFallingEdge(2);
        t2 = time_us_32();
        checksum = checksum << 1;
        checksum = checksum | (t2 - t1) > 100;
        t1 = t2;
    }
    return checksum;
}

void dhtInitalize(uint gpio)
{
    gpio_init(gpio);
    gpio_set_dir(gpio, GPIO_OUT);
    gpio_put(gpio, 1);
    sleep_ms(1);
    gpio_put(gpio, 0);
    sleep_ms(1);
    gpio_set_dir(gpio, GPIO_IN);
    for (int i = 0; i < 2; i++)
    {
        WaitFallingEdge(gpio);
    }
}

typedef struct
{
    float temperature;
    float humidity;
    bool error;
} dhtData;

void dhtread(uint gpio, dhtData *reading)
{
    dhtInitalize(gpio);
    uint32_t data = getData(gpio);
    uint8_t checksum = getCheck(gpio);
    uint8_t byte1 = (data >> 24 & 0xFF);
    uint8_t byte2 = (data >> 16 & 0xFF);
    uint8_t byte3 = (data >> 8 & 0xFF);
    uint8_t byte4 = (data & 0xFF);

    reading->error = (checksum != ((byte1 + byte2 + byte3 + byte4) & 0xFF));
    reading->humidity = (float)((byte1 << 8) | byte2) / 10.0;

    int neg = byte3 & 0x80;
    byte3 = byte3 & 0x7F;
    reading->temperature = (float)(byte3 << 8 | byte4) / 10.0;
    if (neg > 0)
        reading->temperature = -reading->temperature;
}

int main()
{
    stdio_init_all();
    printf("data^^^^^\n");
    dhtData reading;
    dhtread(2, &reading);
    printf("Humidity= %f %\n", reading.humidity);
    printf("Temperature= %f C\n", reading.temperature);
    return 0;
}

Page 275

PIO Program

.program dht
    set pins, 1 
    set pindirs, 1    
again:
  pull block
  set pins, 0
mov x, osr
loop1: 
    jmp x--,loop1
set pindirs, 0 
wait 1 pin 0
wait 0 pin 0
wait 1 pin 0
wait 0 pin 0

    set y,31
bits:
    wait 1 pin 0
    set x, 0
loop2:
        jmp x--,continue
continue: jmp pin,loop2 
        in x,4  
    jmp y--,bits

    set y,7
check:
    wait 1 pin 0
    set x, 0
loop3:
        jmp x--,continue2
continue2: jmp pin,loop3
        in x,4  
    jmp y--,check
jmp again

C Program

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"
#include "hardware/pio.h"

#include "DHT.pio.h"
uint dhtInitalize(PIO pio, int gpio)
{

    uint offset = pio_add_program(pio, &dht_program);

    uint sm = pio_claim_unused_sm(pio, true);
    pio_gpio_init(pio, gpio);
    pio_sm_config c = dht_program_get_default_config(offset);
    sm_config_set_clkdiv_int_frac(&c, 128, 0);

    sm_config_set_set_pins(&c, gpio, 1);
    sm_config_set_in_pins(&c, gpio);

    sm_config_set_jmp_pin(&c, gpio);
    sm_config_set_in_shift(&c, true, true, 32);
    pio_sm_init(pio0, sm, offset, &c);

    pio_sm_set_enabled(pio0, sm, true);
    return sm;
}

uint8_t getByte(PIO pio, uint sm)
{
    uint32_t count = pio_sm_get_blocking(pio0, sm);
    uint8_t byte = 0;
    for (int i = 0; i < 8; i++)
    {
        byte = byte << 1;
        if (((count >> i * 4) & 0x0F) > 8)
        {
            byte = byte | 1;
        }
    }
    return byte;
}

typedef struct
{
    float temperature;
    float humidity;
    bool error;
} dhtData;

void dhtread(PIO pio, uint sm, dhtData *reading)
{

    pio_sm_put_blocking(pio, sm, 1000);

    uint8_t byte1 = getByte(pio, sm);
    uint8_t byte2 = getByte(pio, sm);
    uint8_t byte3 = getByte(pio, sm);
    uint8_t byte4 = getByte(pio, sm);

    uint8_t checksum = getByte(pio, sm);

    reading->error = (checksum == (byte1 + byte2 + byte3 + byte4) & 0xFF);
    reading->humidity = (float)((byte1 << 8) | byte2) / 10.0;

    int neg = byte3 & 0x80;
    byte3 = byte3 & 0x7F;
    reading->temperature = (float)(byte3 << 8 | byte4) / 10.0;
    if (neg > 0)
        reading->temperature = -reading->temperature;
}

int main()
{
    stdio_init_all();
    uint sm = dhtInitalize(pio0, 2);
    dhtData reading;
    dhtread(pio0, sm, &reading);
    printf("Humidity= %f %\n", reading.humidity);
    printf("Temperature= %f C\n", reading.temperature);
    return 0;
}

Page 280

PIO Program
.program dht
 set pins, 1 
 set pindirs, 1 
again:
 pull block
 set pins, 0
mov x, osr
loop1: 
 jmp x--,loop1
set pindirs, 0 
wait 1 pin 0
wait 0 pin 0
wait 1 pin 0
wait 0 pin 0

set y,31
bits:
 wait 1 pin 0 [25]
 in pins,1 
 wait 0 pin 0
 jmp y--,bits

 set y,7
check:
wait 1 pin 0 [25]
 in pins,1 
 wait 0 pin 0 
 jmp y--,check
push block
jmp again

C Program

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"
#include "hardware/pio.h"

#include "DHT.pio.h"
uint dhtInitalize(PIO pio, int gpio)
{

    uint offset = pio_add_program(pio, &dht_program);

    uint sm = pio_claim_unused_sm(pio, true);
    pio_gpio_init(pio, gpio);

    pio_sm_config c = dht_program_get_default_config(offset);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    sm_config_set_set_pins(&c, gpio, 1);
    sm_config_set_in_pins(&c, gpio);
    sm_config_set_in_shift(&c, false, true, 32);
    pio_sm_init(pio0, sm, offset, &c);

    pio_sm_set_enabled(pio0, sm, true);
    return sm;
}

typedef struct
{
    float temperature;
    float humidity;
    bool error;
} dhtData;

void dhtread(PIO pio, uint sm, dhtData *reading)
{
    pio_sm_put_blocking(pio, sm, 500);

    uint32_t data = pio_sm_get_blocking(pio0, sm);
    uint8_t byte1 = (data >> 24 & 0xFF);
    uint8_t byte2 = (data >> 16 & 0xFF);
    uint8_t byte3 = (data >> 8 & 0xFF);
    uint8_t byte4 = (data & 0xFF);
    uint8_t checksum = pio_sm_get_blocking(pio0, sm) & 0xFF;

    reading->error = (checksum != ((byte1 + byte2 + byte3 + byte4) & 0xFF));
    reading->humidity = (float)((byte1 << 8) | byte2) / 10.0;

    int neg = byte3 & 0x80;
    byte3 = byte3 & 0x7F;
    reading->temperature = (float)(byte3 << 8 | byte4) / 10.0;
    if (neg > 0)
        reading->temperature = -reading->temperature;
}

int main()
{
    stdio_init_all();
    uint sm = dhtInitalize(pio0, 2);
    dhtData reading;
    dhtread(pio0, sm, &reading);
    printf("Humidity= %f %\n", reading.humidity);
    printf("Temperature= %f C\n", reading.temperature);
    return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.13)

set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)

include(pico_sdk_import.cmake)


project(dht C CXX ASM)
pico_sdk_init()
add_executable(dht
 DHT.c
)
pico_generate_pio_header(dht ${CMAKE_CURRENT_LIST_DIR}/DHT.pio)
target_link_libraries(dht  pico_stdlib hardware_pio)
pico_add_extra_outputs(dht)

Page 300

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"

int presence(uint8_t pin)
{
    gpio_set_dir(2, GPIO_OUT);
    gpio_put(pin, 1);
    sleep_ms(1);
    gpio_put(pin, 0);
    sleep_us(480);
    gpio_set_dir(pin, GPIO_IN);
    sleep_us(70);
    int b = gpio_get(pin);
    sleep_us(410);
    return b;
}

void writeBit(uint8_t pin, int b)
{
    int delay1, delay2;
    if (b == 1)
    {
        delay1 = 6;
        delay2 = 64;
    }
    else
    {
        delay1 = 60;
        delay2 = 10;
    }
    gpio_set_dir(pin, GPIO_OUT);
    gpio_put(pin, 0);
    sleep_us(delay1);
    gpio_set_dir(pin, GPIO_IN);
    sleep_us(delay2);
}

void writeByte(uint8_t pin, int byte)
{
    for (int i = 0; i < 8; i++)
    {
        if (byte & 1)
        {
            writeBit(pin, 1);
        }
        else
        {
            writeBit(pin, 0);
        }
        byte = byte >> 1;
    }
}

uint8_t readBit(uint8_t pin)
{
    gpio_set_dir(pin, GPIO_OUT);
    gpio_put(pin, 0);
    sleep_us(8);
    gpio_set_dir(pin, GPIO_IN);
    sleep_us(2);
    uint8_t b = gpio_get(pin);
    sleep_us(60);
    return b;
}

int readByte(uint8_t pin)
{
    int byte = 0;
    int i;
    for (i = 0; i < 8; i++)
    {
        byte = byte | readBit(pin) << i;
    };
    return byte;
}

int convert(uint8_t pin)
{
    writeByte(pin, 0x44);
    int i;
    for (i = 0; i < 500; i++)
    {
        sleep_ms(10);
        if (readBit(pin) == 1)
            break;
    }
    return i;
}

uint8_t crc8(uint8_t *data, uint8_t len)
{
    uint8_t i;
    uint8_t j;
    uint8_t temp;
    uint8_t databyte;
    uint8_t crc = 0;
    for (i = 0; i < len; i++)
    {
        databyte = data[i];
        for (j = 0; j < 8; j++)
        {
            temp = (crc ^ databyte) & 0x01;
            crc >>= 1;
            if (temp)
                crc ^= 0x8C;

            databyte >>= 1;
        }
    }

    return crc;
}

float getTemperature(uint8_t pin)
{
    if (presence(pin) == 1)
        return -1000;
    writeByte(pin, 0xCC);
    if (convert(pin) == 500)
        return -3000;
    presence(pin);
    writeByte(pin, 0xCC);
    writeByte(pin, 0xBE);
    int i;
    uint8_t data[9];
    for (i = 0; i < 9; i++)
    {
        data[i] = readByte(pin);
    }
    uint8_t crc = crc8(data, 9);
    if (crc != 0)
        return -2000;
    int t1 = data[0];
    int t2 = data[1];
    int16_t temp1 = (t2 << 8 | t1);
    float temp = (float)temp1 / 16;
    return temp;
}

int main()
{
    stdio_init_all();
    gpio_init(2);

    if (presence(2) == 1)
    {
        printf("No device \n");
    }
    float t;
    for (;;)
    {
        do
        {
            t = getTemperature(2);
        } while (t < -999);
        printf("%f\r\n", t);
        sleep_ms(500);
    };

    return 0;
}

Page 309

PIO Program

.program DS1820 
.wrap_target
again:
  pull block
  mov x, osr
  jmp !x, read

write:  set pindirs, 1 
  set pins, 0  
loop1: 
    jmp x--,loop1
set pindirs, 0 [31]
wait 1 pin 0 [31]

  pull block
  mov x, osr
bytes1:
   pull block
  set y, 7    
  set pindirs, 1 
bit1:
  set pins, 0 [1]
  out pins,1 [31]
    set pins, 1 [20]
   jmp y--,bit1
jmp x--,bytes1

set pindirs, 0 [31]
jmp again

read:
  pull block
  mov x, osr
bytes2:
  set y, 7
bit2:
  set pindirs, 1 
  set pins, 0 [1]  
  set pindirs, 0 [5]
  in pins,1 [10]   
jmp y--,bit2
jmp x--,bytes2
.wrap

C Program

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"
#include "DS1820.pio.h"

uint8_t crc8(uint8_t *data, uint8_t len)
{
    uint8_t i;
    uint8_t j;
    uint8_t temp;
    uint8_t databyte;
    uint8_t crc = 0;
    for (i = 0; i < len; i++)
    {
        databyte = data[i];
        for (j = 0; j < 8; j++)
        {
            temp = (crc ^ databyte) & 0x01;
            crc >>= 1;
            if (temp)
                crc ^= 0x8C;

            databyte >>= 1;
        }
    }

    return crc;
}

void writeBytes(PIO pio, uint sm, uint8_t bytes[], int len)
{
    pio_sm_put_blocking(pio, sm, 250);
    pio_sm_put_blocking(pio, sm, len - 1);
    for (int i = 0; i < len; i++)
    {
        pio_sm_put_blocking(pio, sm, bytes[i]);
    }
}
void readBytes(PIO pio, uint sm, uint8_t bytes[], int len)
{
    pio_sm_put_blocking(pio, sm, 0);
    pio_sm_put_blocking(pio, sm, len - 1);
    for (int i = 0; i < len; i++)
    {
        bytes[i] = pio_sm_get_blocking(pio, sm) >> 24;
    }
}

float getTemperature(PIO pio, uint sm)
{
    writeBytes(pio, sm, (uint8_t[]){0xCC, 0x44}, 2);
    sleep_ms(1000);
    writeBytes(pio, sm, (uint8_t[]){0xCC, 0xBE}, 2);

    uint8_t data[9];
    readBytes(pio, sm, data, 9);

    uint8_t crc = crc8(data, 9);
    if (crc != 0)
        return -2000;
    int t1 = data[0];
    int t2 = data[1];
    int16_t temp1 = (t2 << 8 | t1);
    volatile float temp = (float)temp1 / 16;
    return temp;
}

uint DS18Initalize(PIO pio, int gpio)
{

    uint offset = pio_add_program(pio, &DS1820_program);

    uint sm = pio_claim_unused_sm(pio, true);
    pio_gpio_init(pio, gpio);

    pio_sm_config c = DS1820_program_get_default_config(offset);
    sm_config_set_clkdiv_int_frac(&c, 255, 0);
    sm_config_set_set_pins(&c, gpio, 1);
    sm_config_set_out_pins(&c, gpio, 1);
    sm_config_set_in_pins(&c, gpio);
    sm_config_set_in_shift(&c, true, true, 8);
    pio_sm_init(pio0, sm, offset, &c);
    pio_sm_set_enabled(pio0, sm, true);
    return sm;
}

int main()
{
    stdio_init_all();

    uint sm = DS18Initalize(pio0, 2);

    float t;
    for (;;)
    {
        do
        {
            t = getTemperature(pio0, sm);
        } while (t < -999);
        printf("temperature %f\r\n", t);
        sleep_ms(500);
    };

    return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.13)

set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)

include(pico_sdk_import.cmake)
project(ds18 C CXX ASM)
pico_sdk_init()
add_executable(ds18
 DS1820.c
)
pico_generate_pio_header(ds18 ${CMAKE_CURRENT_LIST_DIR}/DS1820.pio)
target_link_libraries(ds18  pico_stdlib hardware_gpio hardware_pio)
pico_add_extra_outputs(ds18)

Page 322

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"

int main()
{
    stdio_init_all();
    uart_init(uart1, 9600);
    gpio_set_function(4, GPIO_FUNC_UART);
    gpio_set_function(5, GPIO_FUNC_UART);
    uart_set_format(uart1, 8, 1, UART_PARITY_EVEN);
    uint8_t SendData[] = "Hello World";
    uint8_t RecData[20];
    uart_write_blocking(uart1, SendData, 11);
    uart_read_blocking(uart1, RecData, 11);
    RecData[11] = 0;
    printf("%s", RecData);
}

Page 334

#include "pico/stdlib.h"
#include "hardware/gpio.h"

#define Debug true
int initWiFi()
{
    uart_init(uart1, 115200);
    gpio_set_function(4, GPIO_FUNC_UART);
    gpio_set_function(5, GPIO_FUNC_UART);
    uart_set_format(uart1, 8, 1, UART_PARITY_NONE);
    sleep_ms(100);
    return 0;
}
int getBlock(uint8_t buf[], int len)
{
    int count = 0;
    while (count < len - 1)
    {
        if (uart_is_readable_within_us(uart1, 10000))
        {
            buf[count++] = uart_getc(uart1);
            if (Debug)
                uart_putc(uart0, buf[count - 1]);
        }
        else
        {
            break;
        }
    }
    buf[count] = 0;
    return count;
}

int ATWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT\r\n";
    uart_write_blocking(uart1, SendData, 4);
    return getBlock(buf, len);
}

int main()
{
    stdio_init_all();
    initWiFi();
    uint8_t buf[512];
    ATWiFi(buf, 512);
    sleep_ms(1000);
}

Page 347

#include <stdio.h>
#include "pico/stdlib.h"
#include "hardware/gpio.h"
#include <string.h>

#define Debug true
int initWiFi()
{
    uart_init(uart1, 115200);
    gpio_set_function(4, GPIO_FUNC_UART);
    gpio_set_function(5, GPIO_FUNC_UART);
    uart_set_format(uart1, 8, 1, UART_PARITY_NONE);

    uart_set_translate_crlf(uart0, true);
    sleep_ms(100);
    return 0;
}

int getBlock(uint8_t buf[], int len)
{
    int count = 0;
    while (count < len - 1)
    {
        if (uart_is_readable_within_us(uart1, 10000))
        {
            buf[count++] = uart_getc(uart1);
            if (Debug)
                uart_putc(uart0, buf[count - 1]);
        }
        else
        {
            break;
        }
    }
    buf[count] = 0;
    return count;
}

int ATWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT\r\n";
    uart_write_blocking(uart1, SendData, 4);
    return getBlock(buf, len);
}

int getVersionWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT+GMR\r\n";
    uart_write_blocking(uart1, SendData, 8);
    return getBlock(buf, len);
}

int resetWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT+RST\r\n";
    uart_write_blocking(uart1, SendData, 8);
    return getBlock(buf, len);
}

int setUARTWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT+UART_CUR=115200,8,1,0,0\r\n";
    uart_write_blocking(uart1, SendData, 28);
    return getBlock(buf, len);
}

int modeWiFi(uint8_t buf[], int len, int mode)
{
    uint8_t command[32];
    int count = snprintf(command, 32, "AT+CWMODE_CUR=%d\r\n", mode);
    uart_write_blocking(uart1, command, count);
    return getBlock(buf, len);
}

int getBlocks(uint8_t buf[], int len, int num, char target[])
{
    for (int i = 0; i < num; i++)
    {
        if (uart_is_readable_within_us(uart1, 1000 * 1000))
        {
            getBlock(buf, len);
            if (strstr(buf, target))
                return i;
        }
    }
    return -1;
}

int scanWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT+CWLAP\r\n";
    uart_write_blocking(uart1, SendData, 18);
    return getBlocks(buf, len, 20, "OK");
}

int connectWiFi(uint8_t buf[], int len, char ssid[], char pass[])
{
    uint8_t command[128];
    int count = snprintf(command, 128, "AT+CWJAP_CUR=\"%s\",\"%s\"\r\n", ssid, pass);
    uart_write_blocking(uart1, command, count);
    return getBlocks(buf, len, 20, "OK");
}

int getIPWiFi(uint8_t buf[], int len)
{
    uint8_t SendData[] = "AT+CIFSR\r\n";
    uart_write_blocking(uart1, SendData, 10);
    return getBlocks(buf, len, 20, "OK");
}

int getWebPageWiFi(uint8_t buf[], int len, char URL[], char page[])
{
    uint8_t command[128];
    int count = snprintf(command, 128, "AT+CIPSTART=\"TCP\",\"%s\",80\r\n", URL);
    uart_write_blocking(uart1, command, count);
    if (getBlocks(buf, len, 20, "OK") < 0)
        return -1;
    char http[150];
    sprintf(http, "GET %s HTTP/1.1\r\nHost:%s\r\n\r\n", page, URL);
    count = snprintf(command, 128, "AT+CIPSEND=%d\r\n", strlen(http));
    uart_write_blocking(uart1, command, count);
    if (getBlocks(buf, len, 20, ">") < 0)
        return -1;
    uart_write_blocking(uart1, http, strlen(http));
    return getBlocks(buf, len, 20, "</html>");
}

int startServerWiFi(uint8_t buf[], int len)
{
    char temp[256];
    char id[10];
    uart_write_blocking(uart1, "AT+CIPMUX=1\r\n", 13);
    if (getBlocks(buf, len, 10, "OK") < 0)
        return -1;
    uart_write_blocking(uart1, "AT+CIPSERVER=1,80\r\n", 19);
    if (getBlocks(buf, len, 10, "OK") < 0)
        return -1;

    for (;;)
    {
        if (getBlocks(buf, len, 1, "+IPD") < 0)
            continue;

        char *b = strstr(buf, "+IPD");
        b += 5;
        strncpy(temp, b, sizeof(temp));
        char *e = strstr(temp, ",");
        int d = e - temp;
        memset(id, '\0', sizeof(id));
        strncpy(id, temp, d);

        char data[] = "HTTP/1.0 200 OK\r\nServer: Pico\r\nContent-type: text/html\r\n\r\n<html><head><title>Temperature</title></head><body><p>{\"humidity\":81%,\"airtemperature\":23.5C}</p></body></html>\r\n";

        uint8_t command[128];
        int count = snprintf(command, 128, "AT+CIPSEND=%s,%d\r\n", id, strlen(data));
        uart_write_blocking(uart1, command, count);
        if (getBlocks(buf, len, 10, ">") < 0)
            return -1;

        uart_write_blocking(uart1, data, strlen(data));
        if (getBlocks(buf, len, 10, "OK") < 0)
            return -1;
        count = snprintf(command, 128, "AT+CIPCLOSE=%s\r\n", id);
        uart_write_blocking(uart1, command, count);

        if (getBlocks(buf, len, 10, "OK") < 0)
            return -1;
    }
    return 0;
}

int main()
{
    stdio_init_all();
    uint8_t buf[512];
    initWiFi();
    modeWiFi(buf, 512, 1);
    connectWiFi(buf, 512, "ssid", "password");
    getIPWiFi(buf, 512);
    startServerWiFi(buf, 512);
    sleep_ms(1000);
}

Programming the Raspberry Pi Pico in MicroPython

picoPython360

 

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into Thonny or VS Code project. 

The only downside is that you have to create a project to paste the code into.

To do this follow the instructions in the book.

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 30

from machine import Pin
import time

pin = Pin(25, Pin.OUT)

while True:
    pin.value(1)
    time.sleep(1)
    pin.value(0)
    time.sleep(1)

Page 30

from machine import Pin
import time

pin = Pin(22, Pin.OUT)
while True:
    pin.toggle()
    time.sleep(1)

Page 34

from machine import  Pin
pin = Pin(22, Pin.OUT)
while True:
    pin.value(1)
    pin.value(0)

Page 34

from machine import  Pin
def flash():
    pin = Pin(22, Pin.OUT)
    while True:
        pin.value(1)
        pin.value(0)
flash()

Page 35

from machine import Pin


@micropython.native
def flash():
    pin = Pin(22, Pin.OUT)
    while True:
        pin.value(1)
        pin.value(0)
 
flash()

Page 36

from machine import Pin
import time

pin = Pin(22, Pin.OUT)
while True:
    pin.value(1)
    time.sleep(0.5)
    pin.value(0)
    time.sleep(0.5)

Page 36

from machine import Pin
import time

pin = Pin(22, Pin.OUT)
while True:
    pin.value(1)
    time.sleep_us(10)
    pin.value(0)
    time.sleep_us(10)

Page 32

from machine import Pin
import time
pin = Pin(22, Pin.OUT)
n = 10
while True:
    for i in range(n):
        pass
    pin.value(1)
    for i in range(n):
        pass
    pin.value(0)

Page 38

from machine import Pin

pin1 = Pin(21, Pin.OUT)
pin2 = Pin(22, Pin.OUT)
while True:
    pin1.value(1)
    pin2.value(0)
    pin1.value(0)
    pin2.value(1)

Page 41

from machine import Pin
import machine

def gpio_get():
    return machine.mem32[0xd0000000+0x010]

def gpio_set(value, mask):
    machine.mem32[0xd0000000 +  0x01C] = (machine.mem32[0xd0000000+0x010]) ^ value & mask


pin = Pin(22, Pin.OUT)
pin = Pin(21, Pin.OUT)
value1 = 1 << 22 | 0 << 21
value2 = 0 << 22 | 1 << 21
mask = 1 << 22 | 1 << 21
while True:
    gpio_set(value1, mask)
    gpio_set(value2, mask)

Page 61

from machine import Pin
import time
pinIn = Pin(22, Pin.IN, Pin.PULL_DOWN)
pinLED = Pin(25, Pin.OUT)

while True:
    if pinIn.value():
        pinLED.on()
    else:
        pinLED.off()
    time.sleep(0.5)

Page 62

from machine import Pin
import time
pinIn = Pin(22, Pin.IN, Pin.PULL_DOWN)
pinLED = Pin(25, Pin.OUT)

while True:
    while pinIn.value() == 0:
        pass
    while pinIn.value() == 1:
        pass
    pinLED.on()
    time.sleep(1)
    pinLED.off()

Page 63

 
from machine import Pin
import time
pinIn = Pin(22, Pin.IN, Pin.PULL_DOWN)
pinLED = Pin(25, Pin.OUT)

while True:
    while pinIn.value() == 0:
        pass
    t = time.ticks_ms()
    time.sleep_ms(1)
    while pinIn.value() == 1:
        pass
    t = time.ticks_diff(time.ticks_ms(), t)
    if t < 2000:
        pinLED.on()
        time.sleep(1)
        pinLED.off()
    else:
        for i in range(10):
            pinLED.on()
            time.sleep_ms(100)
            pinLED.off()
            time.sleep_ms(100)

Page 64

from machine import Pin
import time
pinIn = Pin(22, Pin.IN)

while True:
    while pinIn.value() == 1:
        pass
    while pinIn.value() == 0:
        pass

    t = time.ticks_us()
    while pinIn.value() == 1:
        pass
    t = time.ticks_diff(time.ticks_us(), t)
    print(t)
    time.sleep(1)

Page 65

import time
import machine

pinIn = machine.Pin(22, machine.Pin.IN)

while True:
    t = machine.time_pulse_us(pinIn, 1)
    print(t)
    time.sleep(1)

Page 65

import machine
import time

import machine
pinIn = machine.Pin(22, machine.Pin.IN)

while True:
    while pinIn.value() == 1:
        pass
    t = machine.time_pulse_us(pinIn, 1)
    print(t)
    time.sleep(1)

Page 68

from machine import Pin
import time
pinIn = Pin(22, Pin.IN)

s = 0
count = 0
while True:
    i = pinIn.value()
    t = time.ticks_add(time.ticks_us(), 1000*100)
    if s == 0:  # button not pushed
        if i:
            s = 1
            count = count+1
            print("Button Push ", count)
    elif s == 1:  # button pushed
        if not i:
            s = 0
    else:
        s = 0
    while time.ticks_us() < t:
        pass

Page 70

from machine import Pin
import time
pinIn = Pin(22, Pin.IN)

s = 0
while True:
    i = pinIn.value()
    t = time.ticks_add(time.ticks_us(), 1000*100)
    if s == 0:  # button not pushed
        if i:
            s = 1
        tpush = t
    elif s == 1:  # button pushed
        if not i:
            s = 0
            if time.ticks_diff(t, tpush) > 2000000:
                print("Button held \n\r")
            else:
                print("Button pushed \n\r")
    else:
        s = 0
    while time.ticks_us() < t:
        pass

Page 71  Corrected - see Erata.

from machine import Pin
import time
pinIn = Pin(22, Pin.IN)
pinLED1 = Pin(21, Pin.OUT)
pinLED2 = Pin(20, Pin.OUT)
pinLED3 = Pin(19, Pin.OUT)
pinLED1.on()
pinLED2.off()
pinLED3.off()
s=0
buttonState=pinIn.value()
while True:
    buttonNow=pinIn.value()
    edge=buttonState-buttonNow
    buttonState=buttonNow
    t=time.ticks_add(time.ticks_us(),1000*100)
    if s==0:
        if edge==1:
            s=1
            pinLED1.off()
            pinLED2.on()
            pinLED3.off()
    
    elif s==1:
        if edge==1:
            s=2
            pinLED1.off()
            pinLED2.off()
            pinLED3.on()
    elif s==2:
        if edge==1:
            s=0
            pinLED1.on()
            pinLED2.off()
            pinLED3.off()
    else:
        s=0
    while time.ticks_us()<t:
        pass

Page 79

from utime import sleep
from machine import Pin
import machine


def gpio_get_events(pinNo):
    mask = 0xF << 4 * (pinNo % 8)
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    return (machine.mem32[intrAddr] & mask) >> (4 * (pinNo % 8))


def gpio_clear_events(pinNo, events):
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    machine.mem32[intrAddr] = events << (4 * (pinNo % 8))


pin = Pin(22, Pin.IN, Pin.PULL_UP)
while True:
    event = gpio_get_events(22)
    if(event & Pin.IRQ_FALLING):
        print("falling")
    if(event & Pin.IRQ_RISING):
        print("rising")
    gpio_clear_events(22, Pin.IRQ_FALLING | Pin.IRQ_RISING)
    sleep(0.5)

Page 79

from utime import sleep
from machine import Pin

pin = Pin(22, Pin.IN, Pin.PULL_DOWN)

print("Press Button")
sleep(10)
if pin.value():
    print("Button Pressed")
else:
    print("Button Not Pressed")

Page 80

from utime import sleep
from machine import Pin
import machine


def gpio_get_events(pinNo):
    mask = 0xF << 4 * (pinNo % 8)
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    return (machine.mem32[intrAddr] & mask) >> (4 * (pinNo % 8))


def gpio_clear_events(pinNo, events):
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    machine.mem32[intrAddr] = events << (4 * (pinNo % 8))


pin = Pin(22, Pin.IN, Pin.PULL_DOWN)

print("Press Button")
gpio_clear_events(22, Pin.IRQ_FALLING)
sleep(10)
event = gpio_get_events(22)
gpio_clear_events(22, Pin.IRQ_FALLING)
if event & Pin.IRQ_FALLING:
    print("Button Pressed")
else:
    print("Button Not Pressed")

Page 81

import time
from utime import sleep
from machine import Pin
import machine


def gpio_get_events(pinNo):
    mask = 0xF << 4 * (pinNo % 8)
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    return (machine.mem32[intrAddr] & mask) >> (4 * (pinNo % 8))


def gpio_clear_events(pinNo, events):
    intrAddr = 0x40014000 + 0x0f0 + (pinNo // 8)*4
    machine.mem32[intrAddr] = events << (4 * (pinNo % 8))


pin = Pin(22, Pin.IN, Pin.PULL_DOWN)
while True:
    gpio_clear_events(22, Pin.IRQ_FALLING | Pin.IRQ_RISING)
    while not(gpio_get_events(22) & Pin.IRQ_RISING):
        pass
    t = time.ticks_us()
    while not(gpio_get_events(22) & Pin.IRQ_FALLING):
        pass
    t = time.ticks_diff(time.ticks_us(), t)
    print(t)
    sleep(1)

Page 84

import time
from machine import Pin
import machine
import array

count = 0
t = array.array('L', [0]*20)


def myHandler(pin):
    global t, count
    t[count] = time.ticks_us()
    count = count+1
    if count > 19:
        for i in range(1, 20):
            print(time.ticks_diff(t[i], t[i-1]))
        pin.irq(None, Pin.IRQ_RISING, hard=True)
        return


pin = Pin(22, Pin.IN, Pin.PULL_DOWN)
pin.irq(myHandler, Pin.IRQ_RISING, hard=False)

Page 86

import time
from machine import Pin


def myHandler(pin):
    print(time.ticks_us())


pin = Pin(22, Pin.IN, Pin.PULL_DOWN)
pin.irq(myHandler, Pin.IRQ_RISING, hard=True)
while True:
    print("doing something useful")

Page 92

from machine import Pin, PWM
pwm16 = PWM(Pin(16))
pwm17 = PWM(Pin(17))

pwm16.freq(250)

pwm16.duty_u16(65535//2)
pwm17.duty_u16(65535//4)

Page 93

from machine import Pin, PWM
import machine


def pwm_set_phase(sliceNo, phase):
    Addr = 0x40050000 + 0x14*sliceNo
    if phase:
        machine.mem32[Addr] = machine.mem32[Addr] | 0x2
    else:
        machine.mem32[Addr] = machine.mem32[Addr] & 0xFFFFFFFD


pwm16 = PWM(Pin(16))
pwm17 = PWM(Pin(17))
pwm16.freq(250)
pwm_set_phase(0, True)
pwm16.duty_u16(65535//2)
pwm17.duty_u16(65535//4)

Page 94

from machine import Pin, PWM
import machine


def pwm_set_polarity(sliceNo, channel, invert):
    Addr = 0x40050000 + 0x14*sliceNo
    if invert:
        machine.mem32[Addr] = machine.mem32[Addr] | 0x1 << (2+channel)
    else:
        machine.mem32[Addr] = machine.mem32[Addr] & ~(0x1 << (2+channel))


pwm16 = PWM(Pin(16))
pwm17 = PWM(Pin(17))

pwm16.freq(250)
pwm_set_polarity(0, 1, True)
pwm16.duty_u16(65535//4)
pwm17.duty_u16(65535//4)

Page 95

from machine import Pin, PWM

pwm16 = PWM(Pin(16))

pwm16.freq(50)

pwm16.duty_u16(65535//2)
while True:
    pwm16.duty_u16(65535//2)
    pwm16.duty_u16(65535//4)

Page 98

from machine import Pin, PWM
import array
import machine
import math


def pwm_get_wrap(sliceNo):
    Addr = 0x40050000 + 0x10+0x14*sliceNo
    return (machine.mem32[Addr])


wave = array.array('H', [0]*256)
for i in range(256):
    wave[i] = int(65535//2 + (math.sin(i * 2.0 * 3.14159 / 256.0) * 65535//2))


pwm16 = PWM(Pin(16))
pwm16.freq(125000000//256)

print(pwm_get_wrap(0))
while(True):
    for i in range(256):
        pwm16.duty_u16(wave[i])

Page 28

from utime import sleep_ms
from machine import Pin, PWM

pwm25 = PWM(Pin(25))
pwm25.freq(2000)

while True:
    for d in range(0,65535,655):
        pwm25.duty_u16(d)
        sleep_ms(50)

Page 101

from utime import sleep_ms
from machine import Pin, PWM

pwm25 = PWM(Pin(25))
pwm25.freq(2000)

while True:
    for b in range(0,100):
        pwm25.duty_u16(int(65535*b*b*b/1000000))
        sleep_ms(50)

Page 111

from machine import Pin, PWM
from time import sleep

class Motor:
    def __init__(self, pinNo):
        self.gpio = pinNo
        self._on = False
        self.speed=0

        self.pwm1=PWM(Pin(pinNo))
        self.pwm1.freq(2000)
        self.pwm1.duty_u16(0)

    def setSpeed(self,s):
        self._on=True
        self.speed=s
        self.pwm1.duty_u16(int(65535*s/100))
    
    def off(self):
        self._on=False
        self.pwm1.duty_u16(0)
    
    def on(self):
        self._on=True
        self.pwm1.duty_u16(int(65535*self.speed/100))

motor=Motor(16)
motor.setSpeed(50)
sleep(1)
motor.off()
sleep(1)
motor.setSpeed(90)
sleep(1)
motor.off()

Page 116

from machine import Pin, PWM
from time import sleep

class Motor:
    def __init__(self, pinNo):
        self.gpio = pinNo
        self._on = False
        self.speed=0

        self.pwm1=PWM(Pin(pinNo))
        self.pwm1.freq(2000)
        self.pwm1.duty_u16(0)

    def setSpeed(self,s):
        self._on=True
        self.speed=s
        self.pwm1.duty_u16(int(65535*s/100))
    
    def off(self):
        self._on=False
        self.pwm1.duty_u16(0)
    
    def on(self):
        self._on=True
        self.pwm1.duty_u16(int(65535*self.speed/100))

class BiMotor(Motor):
    def __init__(self, pinNo):
        super().__init__(pinNo)
        self.forward=True
        self.pwm2=PWM(Pin(pinNo+1))
        self.pwm2.duty_u16(0)
    
    def setForward(self,forward):
        if self.forward==forward:
            return
        self.pwm1.duty_u16(0)
        self.pwm1,self.pwm2=self.pwm2,self.pwm1        
        self.forward=forward
        self.pwm1.duty_u16(int(65535*self.speed/100))

motor=BiMotor(16)
motor.setSpeed(50)
sleep(1)
motor.setForward(False)
sleep(1)
motor.setSpeed(90)
sleep(1)
motor.setForward(True)
motor.off()

Page 115

from machine import Pin, PWM
from time import sleep

class Servo:
    def __init__(self, pinNo):
        self.pwm = PWM(Pin(pinNo))
        self.pwm.freq(50)
        self.position = 65535*2.5/100

    def setPosition(self, p):
        self.position = p
        self.pwm.duty_u16(int(65535*p/1000 + 65535*2.5/100))

servo=Servo(16)
servo.setPosition(100.0)
sleep(1)
servo.setPosition(50.0)
sleep(1)
servo.setPosition(0)

Page 120

from machine import Pin, PWM
import machine
from time import sleep


class Servo:
    def __init__(self, pinNo):
        self.pwm = PWM(Pin(pinNo))
        self.pin = pinNo
        self.pwm.freq(50)
        self.position = 65535*2.5/100

    def setPosition(self, p):
        self.position = p
        self.pwm.duty_u16(int(65535*p/1000 + 65535*2.5/100))

    def getSlice(self):
        return (self.pin >> 1) & 0x07

    def getChannel(self):
        return self.pin & 1

    def setPolarity(self, invert):
        sliceNo = self.getSlice()
        channel = self.getChannel()
        Addr = 0x40050000 + 0x14*sliceNo
        if invert:
            machine.mem32[Addr] = machine.mem32[Addr] | 0x1 << (2+channel)
        else:
            machine.mem32[Addr] = machine.mem32[Addr] & ~(0x1 << (2+channel))


servo = Servo(16)
servo.setPolarity(True)
servo.setPosition(100.0)
sleep(1)
servo.setPosition(50.0)
sleep(1)
servo.setPosition(0)

Page 134

from utime import sleep_ms
from machine import Pin
import machine


class StepperBi4():
    def __init__(self, pinA):
        self.phase = 0
        self.pinA = pinA
        self.timer = None
        self.forward = True
        self.speed = 0

        self.gpios = tuple([Pin(pinA, Pin.OUT), Pin(
            pinA+1, Pin.OUT), Pin(pinA+2, Pin.OUT), Pin(pinA+3, Pin.OUT)])
        self.gpios[0].high()
        self.gpioMask = 0xF << self.pinA
        self.halfstepSeq = [0x1, 0x3, 0x2, 0x6, 0x4, 0xC, 0x8, 0x9]
#   [
#             [0,0,0,1],
#             [0,0,1,1],
#             [0,0,1,0],
#             [0,1,1,0],
#             [0,1,0,0],
#             [1,1,0,0],
#             [1,0,0,0],
#             [1,0,0,1]
#    ]

    def _gpio_set(self, value, mask):
        machine.mem32[0xd0000000 +
                      0x01C] = (machine.mem32[0xd0000000+0x010] ^ value) & mask

    def setPhase(self, phase):
        value = self.halfstepSeq[phase] << self.pinA
        self._gpio_set(value, self.gpioMask)
        self.phase = phase

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)

    def doRotate(self, timer):
        if self.forward:
            self.stepForward()
        else:
            self.stepReverse()

    def rotate(self, forward, speed):
        self.forward = forward
        self.speed = speed
        if speed == 0:
            self.timer.deinit()
            self.timer = None
            return
        if self.timer == None:
            self.timer = machine.Timer()
        self.timer.init(freq=speed, mode=machine.Timer.PERIODIC,
                        callback=self.doRotate)


step = StepperBi4(16)
step.setPhase(0)
while True:
    step.rotate(True, 100)
    sleep_ms(500)
    step.rotate(True, 0)
    sleep_ms(500)

Page 144

from machine import Pin, SPI

spi = SPI(0, sck=Pin(6), miso=Pin(4), mosi=Pin(7))
spi.init(baudrate=500_000, bits=8, polarity=0, phase=0, firstbit=SPI.MSB)

read = bytearray(3)
write = bytearray([0xAA, 0xAA, 0xAA])
spi.write_readinto(write, read)

print(read, write)
spi.deinit()

Page 150

from utime import sleep_ms
from machine import Pin, SPI
from time import sleep

spi = SPI(0, sck=Pin(18), miso=Pin(16), mosi=Pin(19))
spi.init(baudrate=500_000, bits=8, polarity=0, phase=0, firstbit=SPI.MSB)

CS = Pin(17, Pin.OUT)
CS.high()
sleep_ms(1)

write = bytearray([0xD0])
CS.low()
spi.write(write)
read = spi.read(1)
CS.high()
print("Chip ID is", hex(read[0]))

CS.low()
write = bytearray([0xF2, 0x01])
spi.write(write)
write = bytearray([0xF4, 0x27])
spi.write(write)
CS.high()

CS.low()
write = bytearray([0xF7])
spi.write(write)
sleep_ms(10)
rBuff = spi.read(8)
CS.high()

pressure = (rBuff[0] << 12) | (rBuff[1] << 4) | (rBuff[2] >> 4)
temperature = (rBuff[3] << 12) | (rBuff[4] << 4) | (rBuff[5] >> 4)
humidity = rBuff[6] << 8 | rBuff[7]

print("Humidity = ", humidity)
print("Pressure = ", pressure)
print("Temp. = ", temperature)

Page 155

import machine
import utime
sensor_temp = machine.ADC(4)
conversion_factor = 3.3 / (65535)

while True:
    reading = sensor_temp.read_u16() * conversion_factor
    temperature = 27 - (reading - 0.706)/0.001721
    print(temperature)
    utime.sleep(2)

Page 161

from utime import sleep_ms
from machine import Pin, SPI
from time import sleep

spi = SPI(0, sck=Pin(18), miso=Pin(16), mosi=Pin(19))
spi.init(baudrate=500_000, bits=8, polarity=0, phase=0, firstbit=SPI.MSB)
CS = Pin(17, Pin.OUT)
CS.high()
sleep_ms(1)
CS.low()
write = bytearray([0x01, 0x80, 0x00])
read = bytearray(3)
spi.write_readinto(write, read)
CS.high()
data = (read[1] & 0x03) << 8 | read[2]
volts = data * 3.3 / 1023.0
print(volts)
spi.deinit()

Page 162

from utime import sleep_ms
from machine import Pin, SPI
from time import sleep


class spiADC:
    def __init__(self, spi, sckNo, misoNo, mosiNo, CSNo):
        self.spi = SPI(spi, sck=Pin(sckNo), miso=Pin(misoNo), mosi=Pin(mosiNo))
        self.spi.init(baudrate=500_000, bits=8, polarity=0,
                      phase=0, firstbit=SPI.MSB)
        self.CS = Pin(CSNo, Pin.OUT)
        self.CS.high()
        sleep_ms(1)

    def read(self, chan):
        write = bytearray([0x01, (0x08 | chan) << 4, 0x00])
        self.CS.low()
        read = bytearray(3)
        self.spi.write_readinto(write, read)
        self.CS.high()
        data = (read[1] & 0x03) << 8 | read[2]
        volts = data * 3.3 / 1023.0
        return volts


adc = spiADC(0, 18, 16, 19, 17)
volts = adc.read(1)
print(volts)

Page 176

from machine import Pin,I2C

i2c0=I2C(0,scl=Pin(17),sda=Pin(16),freq=400000)

buf = bytearray([0xE7])
i2c0.writeto( 0x40, buf, True)
read= i2c0.readfrom(0x40, 1, True)
print("User Register =",read)

Page 178

from utime import sleep_ms
from machine import Pin, I2C
from time import sleep

i2c0 = I2C(0, scl=Pin(17), sda=Pin(16), freq=100*1000)

buf = bytearray([0xE3])
i2c0.writeto(0x40, buf, False)
read = i2c0.readfrom(0x40, 3, True)
msb = read[0]
lsb = read[1]
check = read[2]
print("msb lsb checksum =", msb, lsb, check)

Page 182

from utime import sleep_ms
from machine import Pin, I2C
from time import sleep


def crcCheck(msb, lsb, check):
    data32 = (msb << 16) | (lsb << 8) | check
    divisor = 0x988000
    for i in range(16):
        if data32 & 1 << (23 - i):
            data32 ^= divisor
        divisor >>= 1
    return data32


i2c0 = I2C(0, scl=Pin(17), sda=Pin(16), freq=100*1000)

buf = bytearray([0xE3])
i2c0.writeto(0x40, buf, False)
read = i2c0.readfrom(0x40, 3, True)
msb = read[0]
lsb = read[1]
check = read[2]
print("msb lsb checksum =", msb, lsb, check)

data16 = (msb << 8) | (lsb & 0xFC)
temp = (-46.85 + (175.72 * data16 / (1 << 16)))
print("Temperature C ", temp)
print("Checksum=", crcCheck(msb, lsb, check))

buf = bytearray([0xF5])
i2c0.writeto(0x40, buf, True)
read = bytearray(3)
while True:
    sleep_ms(1)
    try:
        i2c0.readfrom_into(0x40, read, True)
        break
    except:
        continue
msb = read[0]
lsb = read[1]
check = read[2]
print("msb lsb checksum =", msb, lsb, check)
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + (125.0 * data16) / 65536
print("Humidity ", hum)
print("Checksum=", crcCheck(msb, lsb, check))

Page 190

import rp2
from machine import Pin


@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, )
def blink():
    label("again")
    set(pins, 1)
    set(pins, 0)
    jmp("again")


sm = rp2.StateMachine(0, blink, freq=2000, set_base=Pin(16))
sm.active(1)

Page 193

import rp2
from machine import Pin

@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, )
def blink():
    label("again")
    set(pins, 1)
    set(x,31)
    label("loop1")
    nop() [31]
    jmp(x_dec,"loop1")  
    set(pins, 0)  
    set(x,31)
    label("loop2")
    nop() [31]
    jmp(x_dec,"loop2") 
    jmp("again")  


sm = rp2.StateMachine(0, blink, freq=2000, set_base=Pin(16))
sm.active(1)

Page 194

import rp2
from machine import Pin


@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, )
def squarewave():
    pull(block)
    label("again")
    set(pins, 1)
    mov(x, osr)
    label("loop1")
    jmp(x_dec, "loop1")
    set(pins, 0)
    mov(x, osr)
    label("loop2")
    jmp(x_dec, "loop2")
    jmp("again")


sm = rp2.StateMachine(0, squarewave, freq=2000, set_base=Pin(16))
sm.active(1)
sm.put(0xFFF)

Page 196

import rp2
from machine import Pin


@rp2.asm_pio(out_init=(rp2.PIO.OUT_LOW, rp2.PIO.OUT_LOW))
def output():
    pull(block)
    label("again")
    out(pins, 2)
    jmp("again")


sm = rp2.StateMachine(0, output, freq=2000, out_base=Pin(
    16), out_shiftdir=rp2.PIO.SHIFT_RIGHT)
sm.active(1)
sm.put(0xFEDCBA98)

Page 198

import rp2
from machine import Pin


@rp2.asm_pio(out_init=(rp2.PIO.OUT_LOW, rp2.PIO.OUT_LOW), autopull=True)
def output():
    label("again")
    out(pins, 2)
    jmp("again")


sm = rp2.StateMachine(0, output, freq=2000, out_base=Pin(
    16), out_shiftdir=rp2.PIO.SHIFT_RIGHT)
sm.active(1)
while True:
    sm.put(0xFEDCBA98)

Page 200

import rp2
from machine import Pin

@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW)
def squarewave():
    label("again")    
    nop().side(1)
    jmp("again").side(0) 

sm = rp2.StateMachine(0, squarewave, freq=2000,sideset_base=Pin(16))
sm.active(1)

Page 200

import rp2
from machine import Pin


@rp2.asm_pio(sideset_init=(rp2.PIO.OUT_LOW, rp2.PIO.OUT_LOW))
def squarewave():
    label("again")
    nop().side(2)
    jmp("again").side(1)


sm = rp2.StateMachine(0, squarewave, freq=2000, sideset_base=Pin(16))
sm.active(1)

Page 202

import rp2
from machine import Pin


@rp2.asm_pio()
def light():
    label("again")
    in_(pins, 1)
    push(block)
    jmp("again")


LED = Pin(25, mode=Pin.OUT)
in1 = Pin(16, mode=Pin.IN)
sm = rp2.StateMachine(0, light, freq=2000, in_base=Pin(16))
sm.active(1)
while True:
    flag = sm.get()
    if (flag == 0):
        LED.value(0)
    else:
        LED.value(1)

Page 204

import rp2
from machine import Pin


@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW)
def squarewave():
    label("again")
    wait(0, pin, 0)
    wait(1, pin, 0)
    set(pins, 1)
    set(pins, 0)
    jmp("again")


in1 = Pin(16, mode=Pin.IN)
sm = rp2.StateMachine(0, squarewave, freq=2000,
                      in_base=Pin(16), set_base=Pin(17))
sm.active(1)

Page 217

from machine import Pin
from utime import sleep_ms, ticks_us


class DHT22():
    def __init__(self, gpio):
        self.pin = gpio
        self.pin = Pin(2, mode=Pin.OUT)
        self.pin.high()
        self.checksum = 0
        self.temperature = 0
        self.humidity = 0
        sleep_ms(1)

    def getReading(self):
        DHT = self.pin
        DHT.low()
        sleep_ms(1)
        DHT.init(mode=Pin.IN)
        for i in range(2):
            while DHT.value() == 1:
                pass
            while DHT.value() == 0:
                pass

        data = 0
        t1 = ticks_us()
        for i in range(32):
            while DHT.value() == 1:
                pass
            while DHT.value() == 0:
                pass
            t2 = ticks_us()
            data = data << 1
            data = data | ((t2 - t1) > 100)
            t1 = t2

        checksum = 0
        for i in range(8):
            while DHT.value() == 1:
                pass
            while DHT.value() == 0:
                pass
            t2 = ticks_us()
            checksum = checksum << 1
            checksum = checksum | ((t2 - t1) > 100)
            t1 = t2
        byte1 = (data >> 24 & 0xFF)
        byte2 = (data >> 16 & 0xFF)
        byte3 = (data >> 8 & 0xFF)
        byte4 = (data & 0xFF)
        self.checksum = (checksum == (byte1+byte2+byte3+byte4) & 0xFF)
        self.humidity = ((byte1 << 8) | byte2) / 10.0
        neg = byte3 & 0x80
        byte3 = byte3 & 0x7F
        self.temperature = (byte3 << 8 | byte4) / 10.0
        if neg > 0:
            self.temperature = -self.temperature


dht = DHT22(2)
dht.getReading()
print("Checksum", dht.checksum)
print("Humidity= ", dht.humidity)
print("Temperature=", dht.temperature)

Page 220

import rp2
from machine import Pin


@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopush=True, in_shiftdir=rp2.PIO.SHIFT_RIGHT)
def dht22():
    wrap_target()
    label("again")
    pull(block)
    set(pins, 0)
    mov(x, osr)
    label("loop1")
    jmp(x_dec, "loop1")
    set(pindirs, 0)

    wait(1, pin, 0)
    wait(0, pin, 0)
    wait(1, pin, 0)
    wait(0, pin, 0)

    set(y, 31)
    label("bits")
    wait(1, pin, 0)
    set(x, 0)
    label("loop2")
    jmp(x_dec, "continue")
    label("continue")
    jmp(pin, "loop2")
    in_(x, 4)
    jmp(y_dec, "bits")

    set(y, 7)
    label("check")
    wait(1, pin, 0)
    set(x, 0)
    label("loop3")
    jmp(x_dec, "continue2")
    label("continue2")
    jmp(pin, "loop3")
    in_(x, 4)
    jmp(y_dec, "check")
    wrap()


class DHT22():
    def __init__(self, gpio):
        self.sm = rp2.StateMachine(0, dht22, freq=976562, in_base=Pin(
            gpio), set_base=Pin(gpio), jmp_pin=Pin(gpio))
        self.sm.active(1)

    def getByte(self):
        count = self.sm.get()
        byte = 0
        for i in range(8):
            byte = byte << 1
            if ((count >> i * 4) & 0x0F) > 8:
                byte = byte | 1
        return byte

    def getReading(self):
        self.sm.put(1000)
        byte1 = self.getByte()
        print(byte1)
        byte2 = self.getByte()
        byte3 = self.getByte()
        print(hex(byte2))
        byte4 = self.getByte()
        checksum = self.getByte()
        self.checksum = (checksum == (byte1+byte2+byte3+byte4) & 0xFF)
        self.humidity = ((byte1 << 8) | byte2) / 10.0
        neg = byte3 & 0x80
        byte3 = byte3 & 0x7F
        self.temperature = (byte3 << 8 | byte4) / 10.0
        if neg > 0:
            self.temperature = -self.temperature


dht = DHT22(2)
dht.getReading()
print("Checksum", dht.checksum)
print("Humidity= ", dht.humidity)
print("Temperature=", dht.temperature)

Page 226

import rp2
from machine import Pin


@rp2.asm_pio(set_init=(rp2.PIO.OUT_LOW, rp2.PIO.OUT_LOW), autopush=True, in_shiftdir=rp2.PIO.SHIFT_LEFT)
def dht22():
    wrap_target()
    label("again")
    pull(block)
    set(pins, 0)
    mov(x, osr)
    label("loop1")
    jmp(x_dec, "loop1")
    set(pindirs, 0)

    wait(1, pin, 0)
    wait(0, pin, 0)
    wait(1, pin, 0)
    wait(0, pin, 0)

    set(y, 31)
    label("bits")
    wait(1, pin, 0)[25]
    in_(pins, 1)
    wait(0, pin, 0)
    jmp(y_dec, "bits")

    set(y, 7)
    label("check")
    wait(1, pin, 0)[25]
    set(pins, 2)
    set(pins, 0)
    in_(pins, 1)
    wait(0, pin, 0)
    jmp(y_dec, "check")
    push(block)
    wrap()


class DHT22():
    def __init__(self, gpio):
        self.sm = rp2.StateMachine(0, dht22, freq=490196, in_base=Pin(
            gpio), set_base=Pin(gpio), jmp_pin=Pin(gpio))
        self.sm.active(1)

    def getReading(self):
        self.sm.put(500)
        data = 0
        data = self.sm.get()
        byte1 = (data >> 24 & 0xFF)
        byte2 = (data >> 16 & 0xFF)
        byte3 = (data >> 8 & 0xFF)
        byte4 = (data & 0xFF)
        checksum = self.sm.get() & 0xFF
        self.checksum = (checksum == (byte1+byte2+byte3+byte4) & 0xFF)
        self.humidity = ((byte1 << 8) | byte2) / 10.0
        neg = byte3 & 0x80
        byte3 = byte3 & 0x7F
        self.temperature = (byte3 << 8 | byte4) / 10.0
        if neg > 0:
            self.temperature = -self.temperature


dht = DHT22(2)
dht.getReading()
print("Checksum", dht.checksum)
print("Humidity= ", dht.humidity)
print("Temperature=", dht.temperature)

Page 245

from utime import sleep_ms, sleep_us
from machine import Pin
from time import sleep


class DS18B20:
    def __init__(self, pin):
        self.pin = Pin(pin, mode=Pin.IN)
        self.pin.high()

    def presence(self):
        self.pin.init(mode=Pin.OUT)
        self.pin.high()
        sleep_ms(1)
        self.pin.low()
        sleep_us(480)
        self.pin.init(mode=Pin.IN)
        sleep_us(70)
        b = self.pin.value()
        sleep_us(410)
        return b

    @micropython.native
    def writeBit(self, b):
        if b == 1:
            delay1 = 1
            delay2 = 30
        else:
            delay1 = 30
            delay2 = 0
        self.pin.low()
        for i in range(delay1):
            pass
        self.pin.high()
        for i in range(delay2):
            pass

    def writeByte(self, byte):
        self.pin.init(mode=Pin.OUT)
        for i in range(8):
            self.writeBit(byte & 1)
            byte = byte >> 1
        self.pin.init(mode=Pin.IN)

    @micropython.native
    def readBit(self):
        self.pin.init(mode=Pin.OUT)
        self.pin.low()
        self.pin.high()
        self.pin.init(mode=Pin.IN)
        b = self.pin.value()
        sleep_us(60)
        return b

    def readByte(self):
        byte = 0
        for i in range(8):
            byte = byte | self.readBit() << i
        return byte

    def convert(self):
        self.writeByte(0x44)
        for i in range(500):
            sleep_ms(10)
            if self.readBit() == 1:
                j = i
                break
        return j

    def crc8(self, data, len):
        crc = 0
        for i in range(len):
            databyte = data[i]
            for j in range(8):
                temp = (crc ^ databyte) & 0x01
                crc >>= 1
                if temp:
                    crc ^= 0x8C
                databyte >>= 1
        return crc

    def getTemp(self):
        if self.presence() == 1:
            return -1000
        self.writeByte(0xCC)
        if self.convert() == 500:
            return -3000
        self.presence()
        self.writeByte(0xCC)
        self.writeByte(0xBE)
        data = []
        for i in range(9):
            data.append(self.readByte())
        if self.crc8(data, 9) != 0:
            return -2000
        t1 = data[0]
        t2 = data[1]
        temp1 = (t2 << 8 | t1)
        if t2 & 0x80:
            temp1 = temp1 | 0xFFFF0000
        return temp1/16


dS18B20 = DS18B20(2)
if dS18B20.presence() == 1:
    print("No device")
else:
    print("Device present")

print(dS18B20.getTemp())

Page 253

import rp2
from machine import Pin
from utime import sleep_ms


@rp2.asm_pio(set_init=rp2.PIO.OUT_HIGH, out_init=rp2.PIO.OUT_HIGH, autopush=True, push_thresh=8)
def DS1820():
    wrap_target()
    label("again")
    pull(block)
    mov(x, osr)
    jmp(not_x, "read")
    label("write")
    set(pindirs, 1)
    set(pins, 0)
    label("loop1")
    jmp(x_dec, "loop1")
    set(pindirs, 2)[31]
    wait(1, pin, 0)[31]

    pull(block)
    mov(x, osr)
    label("bytes1")
    pull(block)
    set(y, 7)
    set(pindirs, 3)
    label("bit1")
    set(pins, 0)[1]
    out(pins, 1)[31]
    set(pins, 1)[20]
    jmp(y_dec, "bit1")
    jmp(x_dec, "bytes1")
    set(pindirs, 0)[31]
    jmp("again")

    label("read")
    pull(block)
    mov(x, osr)
    label("bytes2")
    set(y, 7)
    label("bit2")
    set(pindirs, 1)
    set(pins, 0)[1]
    set(pindirs, 0)[5]
    in_(pins, 1)[10]
    jmp(y_dec, "bit2")
    jmp(x_dec, "bytes2")
    wrap()


class DS18B20:
    def __init__(self, pin):
        self.sm = rp2.StateMachine(0, DS1820, freq=490196, set_base=Pin(2), out_base=Pin(
            2), in_base=Pin(2), out_shiftdir=rp2.PIO.SHIFT_RIGHT, in_shiftdir=rp2.PIO.SHIFT_RIGHT)
        self.sm.active(1)

    def writeBytes(self, bytes, len):
        self.sm.put(250)
        self.sm.put(len-1)
        for i in range(len):
            self.sm.put(bytes[i])

    def readBytes(self, len):
        self.sm.put(0)
        self.sm.put(len-1)
        bytes = []
        for i in range(len):
            bytes.append(self.sm.get() >> 24)
        return bytes

    def getTemp(self):
        self.writeBytes([0xCC, 0x44], 2)
        sleep_ms(1000)
        self.writeBytes([0xCC, 0xBE], 2)
        data = self.readBytes(9)
        t1 = data[0]
        t2 = data[1]
        temp1 = (t2 << 8 | t1)
        if t2 & 0x80:
            temp1 = temp1 | 0xFFFF0000
        return temp1/16


dS18B20 = DS18B20(2)
print(dS18B20.getTemp())

Page 263

from machine import UART, Pin
from utime import sleep_ms

uart = UART(1, baudrate=9600, bits=8, parity=2, rx=Pin(5), tx=Pin(4))
SendData = bytearray("Hello World \n", "utf-8")
uart.write(SendData)
RecData = uart.read()
print(RecData)

Page 264

from machine import UART, Pin
from utime import sleep_ms

uart = UART(1, baudrate=9600, bits=8, parity=2, rx=Pin(5), tx=Pin(4))
SendData = bytearray("A"*32, "utf-8")
uart.write(SendData)
sleep_ms(500)
RecData = uart.read(32)
print(RecData)

Page 266

from machine import UART, Pin, mem32
from utime import sleep_ms


def uart_read(uart):
    rxData = bytes()
    while uart.any() > 0:
        rxData += uart.read(1)
    return rxData


uart = UART(1, baudrate=9600, bits=8, parity=2, rx=Pin(5), tx=Pin(4))
SendData = bytearray("A"*65, "utf-8")
uart.write(SendData)
sleep_ms(500)
RecData = uart_read(uart)
print(RecData)
print(len(RecData))

Page 276

from utime import ticks_us, sleep_ms
from machine import UART, Pin


def uart_read(uart, timeout, interchar):
    rxData = bytes()
    t = ticks_us()
    while True:
        if uart.any() > 0:
            break
        if ticks_us()-t > timeout:
            return None
    while True:
        while uart.any() > 0:
            rxData += uart.read(1)
            t = ticks_us()
        if ticks_us()-t > interchar:
            break
    return rxData.decode("utf-8")


def initWiFi(uartNo):
    uart = UART(uartNo, baudrate=115200, bits=8, stop=1,
                parity=None, rx=Pin(5), tx=Pin(4))
    sleep_ms(100)
    return uart


def ATWiFi(uart):
    SendData = "AT\r\n"
    uart.write(SendData)
    return uart_read(uart, 10000, 100)


uart = initWiFi(1)
response = ATWiFi(uart)
print(response)

Page 288

from utime import ticks_us, sleep_ms
from machine import UART, Pin


class WiFi:
    def __init__(self, uartNo, tx, rx):
        self.uart = UART(uartNo, baudrate=115200, bits=8, stop=1,
                         parity=None, rx=Pin(rx), tx=Pin(tx))
 
 
    def _read(self, timeout, interchar):
        rxData = bytearray(2000)
        t = ticks_us()
        while True:
            if self.uart.any() > 0:
                break
            if ticks_us()-t > timeout:
                return None
        while True:
            while self.uart.any() > 0:
                rxData += self.uart.read(1)
                t = ticks_us()
            if ticks_us()-t > interchar:
                break
        return rxData.decode("utf-8")

    def AT(self):
        self.uart.write(b"AT\r\n")
        return self._read(10000, 100)

    def getVersion(self):
        self.uart.write("AT+GMR\r\n")
        return self._read(10000, 100)

    def reset(self):
        self.uart.write("AT+RST\r\n")
        print(self._read(10000, 10000))
        return self._read(10000, 10000)

    def setUART(self, baud):
        self.uart.write("AT+UART_CUR="+str(baud)+",8,1,0,0\r\n")
        return self._read(10000, 100)

    def mode(self, mode):
        command = "AT+CWMODE_CUR="+str(mode)+"\r\n"
        self.uart.write(command)
        return self._read(10000, 100)

    def scan(self):
        self.uart.write("AT+CWLAP\r\n")
        return self._read(100000, 1000*1000*20)

    def scandisplay(self):
        self.uart.write("AT+CWLAP\r\n")
        block = self._read(10000, 100)
        print(block)
        for i in range(20):
            block = self._read(1000*1000*20, 100)
            if block == None:
                break
            if block.find("OK") >= 0:
                break
            print(block)

    def connect(self, ssid, password):
        command = "AT+CWJAP_CUR=" + '"' + ssid+'"' + ","+'"'+password+'"' + "\r\n"
        self.uart.write(command)
        for i in range(20):
            block = self._read(1000*1000*20, 100)
            if block == None:
                break
            if block.find("OK") >= 0:
                break
        print("connected")

    def getIP(self):
        self.uart.write("AT+CIFSR\r\n")
        return self._read(100000, 100)

    def getWebPage(self, URL, page):
        command = 'AT+CIPSTART="TCP",'+'"'+URL+'"'+",80\r\n"
        self.uart.write(command)
        block = self._read(1000*1000*2, 1000*1000)
        if block.find("OK") < 0:
            return -1
        command = 'GET ' + page + ' HTTP/1.1\r\nHost:'+URL+'\r\n\r\n'
        command2 = "AT+CIPSEND="+str(len(command)) + "\r\n"
        self.uart.write(command2)
        block = self._read(1000*1000*8, 1000*1000*1)
        if block.find(">") < 0:
            return -1
        self.uart.write(command)
        block = self._read(1000*1000*5, 1000*1000*1)
        return block

    def startServer(self):
        self.uart.write("AT+CIPMUX=1\r\n")
        response = self._read(1000*1000*5, 1000)
        print("mux set", response)
        if response.find("OK") < 0:
            return -1
        self.uart.write("AT+CIPSERVER=1,80\r\n")
        response = self._read(1000*1000*5, 1000)
        if response.find("OK") < 0:
            return -1
        print("ready for clients", response)

        while True:
            block = self._read(1000*1000*5, 100)
            if block == None:
                continue
            if block.find("+IPD") < 0:
                continue

            n = block.find("+IPD,")+5
            m = block.find(",", n)
            id = block[n:m]

            data = "HTTP/1.0 200 OK\r\nServer: Pico\r\nContent-type: text/html\r\n\r\n<html><head><title>Temperature</title></head><body><p>{\"humidity\":81%,\"airtemperature\":23.5C}</p></body></html>\r\n"
            command = "AT+CIPSEND="+id+","+str(len(data))+"\r\n"

            self.uart.write(command)
            response = self._read(1000*1000*5, 1000*1000*1)
            if response.find(">") < 0:
                return -1
            self.uart.write(data)
            response = self._read(1000*1000*5, 1000*1000*1)
            if response.find("OK") < 0:
                return -1
            command = "AT+CIPCLOSE="+id+"\r\n"
            self.uart.write(command)
            response = self._read(1000*1000*5, 1000*1000*1)
            if response.find("OK") < 0:
                return -1


wifi = WiFi(1, 4, 5)
print(wifi.AT())
response = wifi.mode(1)
print("mode", response)
wifi.connect("SSID", "password")
print(wifi.startServer())

Page 296

from machine import mem32, Pin
from time import sleep_ms
led = Pin(25, mode=Pin.OUT)
addrSIO = 0xd0000000
while True:
    mem32[addrSIO + 0x014] = 1 << 25
    sleep_ms(500)
    mem32[addrSIO + 0x018] = 1 << 25
    sleep_ms(500)

Page 298

from machine import Pin
import machine


def gpio_get():
    return machine.mem32[0xd0000000+0x010]


def gpio_set(value, mask):
    machine.mem32[0xd0000000+0x01C] =
    (machine.mem32[0xd0000000+0x010]) ^ value & mask


pin = Pin(22, Pin.OUT)
pin = Pin(21, Pin.OUT)
value1 = 1 << 22 | 0 << 21
value2 = 0 << 22 | 1 << 21
mask = 1 << 22 | 1 << 21
while True:
    gpio_set(value1, mask)
    gpio_set(value2, mask)

Program Listings

Programmer’s Python: Everything is Data

pythondata360

 

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into Thonny or VS Code project. 

The only downside is that you have to create a project to paste the code into.

To do this follow the instructions in the book.

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 34

f = 1
k = 1
while(True):
    k=k+1
    f=f*k
    print(f,k)

Page 38

pi = 0
k = 1
while True:
    pi += 4 * (-1)**(k+1) * 1 / (2*k-1)
    k = k+1
    print(k, pi)

import fractions
pi = 0
k = 1
while True:
    pi += 4*(-1)**(k+1)*fractions.Fraction(1, 2*k-1)
    k = k+1
    print(k, pi, float(pi))

Page 49

import random
def myFunction(param1 = random.random()):
    print(param1)
myFunction()
myFunction()

import random
def myFunction(param1 = None):
    param1 = param1 or random.random()
    print(param1)
myFunction()
myFunction()

Page 64

import datetime
import zoneinfo

dt1 = datetime.datetime(2022, 1, 14, 10, 50, tzinfo=zoneinfo.ZoneInfo("Europe/Madrid"))
dt2 = datetime.datetime(2022, 1, 14, 10, 50, tzinfo=zoneinfo.ZoneInfo("utc"))
print(dt1.timestamp())
print(dt2.timestamp())

import datetime
import zoneinfo
dt1 = datetime.datetime(2022, 1, 14, 10, 50,
tzinfo=zoneinfo.ZoneInfo("Europe/Madrid"))
dt2 = dt1.replace(tzinfo=zoneinfo.ZoneInfo("utc"))
print(dt1)
print(dt2)

Page 65

import datetime
import zoneinfo
dt1 = datetime.datetime(2022, 1, 14, 10, 50, tzinfo=zoneinfo.ZoneInfo("Europe/Madrid"))
dt2 = dt1.astimezone(tz=zoneinfo.ZoneInfo("utc"))
print(dt1)
print(dt2)

Page 140

import random

class randItIterator():
    def __next__(self):
        return random.random()
    def __iter__(self):
        return self

class randIt(object):
    def __iter__(self):
        return randItIterator()  
     
numbers=randIt()
for x in numbers:
    print(x)

Page 142

import random
class randListIterator():
    def __init__(self,rndList):
        self._n = 0
        self._data = rndList.data
    def __next__(self):
        if self._n >= len(self._data):
            raise StopIteration
        next = self._data[self._n]
        self._n = self._n+1
        return next
       
class randList(object):
    def __init__(self):
        self.data = []
        for i in range(5):
            self.data.append(random.random())
    def __iter__(self):
        return randListIterator(self)

numbers = randList()
for x in numbers:
    print(x)
for x in numbers:
    print(x)

Page 182

class Tree:
    class Node:
        def __init__(self, value):
            self.left = None
            self.right = None
            self.value = value

    class TreeIter():
        def __init__(self, tree):
            self.stack = []
            self.currentNode = tree.root

        def __iter__(self):
            return self

        def __next__(self):
            node = self.currentNode
            if node is None:
                raise StopIteration
            if node.right is not None:
                self.stack.append(node.right)
            if node.left is not None:
                self.currentNode = node.left
            elif len(self.stack) > 0:
                self.currentNode = self.stack.pop()
            else:
                self.currentNode = None
            return node

    def __init__(self, value):
        self.root = Tree.Node(value)

    def __iter__(self):
        return Tree.TreeIter(self)

    def appendBinarySearch(self, value):
        node = self.root
        while True:
            if node.value > value:
                if node.left is None:
                    node.left = self.Node(value)
                    return
                node = node.left
            else:
                if node.right is None:
                    node.right = self.Node(value)
                    return
                node = node.right

    def appendBreathFirst(self, value):
        queue = [self.root]
        while len(queue) > 0:
            node = queue.pop(0)
            if node.left is None:
                node.left = Tree.Node(value)
                return node.left
            queue.append(node.left)
            if node.right is None:
                node.right = Tree.Node(value)
                return node.right
            queue.append(node.right)


tree = Tree(0)
for i in range(1, 15):
    tree.appendBreathFirst(i)
for node in tree:
    print(node.value)

tree = Tree(8)
for i in [3, 10, 1, 6, 14, 4, 7, 13]:
    tree.appendBinarySearch(i)
for node in tree:
    print(node.value)

Page 211

import random

msg=b"Hello World Of Secrets"
oneTime=random.randbytes(len(msg))
crypt= bytes([a^b for a, b in zip(msg,oneTime)])
print(crypt)
decrypt= bytes([a^b for a, b in zip(crypt,oneTime)])
print(decrypt)

Page 225

import pathlib

path = pathlib.Path("myBinFile.bin")
bytes = bytes([0xAA, 0x55, 0xAA, 0x55])

f = open(path, mode="wb")
f.write(bytes)
f.close()

f = open(path, mode="rb")
bytes = f.read(4)
f.close()
print(bytes.hex(","))

Page 226

import pathlib

path = pathlib.Path("myBinFile.bin")

myValue1 = 1234
f = path.open(mode="wb")
f.write(myValue1.to_bytes(4, byteorder="big"))
f.close()

f = path.open(mode="rb")
b = f.read(4)
f.close()

myValue2 = int.from_bytes(b, byteorder="big")
print(myValue2)
 
 
import pathlib

path = pathlib.Path("myBinFile.bin")

myString1 = "Hello World"
b = myString1.encode(encoding="utf8")

f = path.open(mode="wb")
n = f.write(b)
f.close()

f = path.open(mode="rb")
b = f.read(n)
f.close()

myString2 = b.decode(encoding="utf-8")
print(myString2)

Page 227

import pathlib

path = pathlib.Path("myBinFile.bin")

myString1 = "Hello World"

b = myString1.encode(encoding="utf8")
n = len(b)

f = path.open(mode="wb")
f.write(n.to_bytes(4, byteorder="big"))
m = f.write(b)
f.close()
if m != n:
    print("file error")

f = path.open(mode="rb")
b = f.read(4)
n = int.from_bytes(b, byteorder="big")
b = f.read(n)
f.close()

myString2 = b.decode(encoding="utf-8")
print(myString2)

Page 230

import pathlib
import struct

def floatToBytes(f):
    return struct.pack(">d",f)
def floatFromBytes(b):
    return struct.unpack(">d",b)[0]

path = pathlib.Path("myBinFile.bin")

myFloat = 3.14159

f = path.open(mode="wb")
m = f.write(floatToBytes(myFloat))
f.close()

f = path.open(mode="rb")
b = f.read(8)
f.close()

myFloat = floatFromBytes(b)
print(myFloat)

Page 234

import dataclasses
import struct
import pathlib


@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0
    myStruct= struct.Struct(">25sld")
    def save(self,path):
        b=person.myStruct.pack(self.name.encode(encoding="utf8"),
                                             self.id,self.score)
        f=path.open(mode="wb")
        f.write(b)
        f.close()

    def load(self,path):
        f=path.open(mode="rb")
        b=f.read(37)
        f.close()
        a,b,c=person.myStruct.unpack(b)
        self.name=a.decode(encoding="utf8").rstrip("\0x00")
        self.id=b
        self.score=c

path=pathlib.Path("myBinFile.bin")

me=person("mike",42,3.14)
me.save(path)

me2=person()
me2.load(path)
print(me2)
 

Page 235

import dataclasses
import struct
import pathlib
import os

@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0
    myStruct= struct.Struct(">25sld")
    n=37

    def save(self,f):
        b=person.myStruct.pack(
             self.name.encode(encoding="utf8"),self.id,self.score)
        f.write(b)
   
    def load(self,f,id):
        f.seek(id*person.n,0)
        b=f.read(person.n)
        a,b,c=person.myStruct.unpack(b)
        self.name=a.decode(encoding="utf8").rstrip("\0x00")
        self.id=b
        self.score=c



path=pathlib.Path("myBinFile.bin")
os.remove(path)
f=path.open(mode="ab")

me=person("mike",42,3.14)
for id in range(50):
    me.id=id
    me.save(f)
f.close()

f=path.open(mode="r+b")
me2=person()
me2.load(f,20)
print(me2)
me2.load(f,5)
print(me2)
f.close()

Page 235

import dataclasses
import struct
import pathlib
import os

@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0
    myStruct= struct.Struct(">25sld")
    n=37

    def save(self,f):
        b=person.myStruct.pack(
             self.name.encode(encoding="utf8"),self.id,self.score)
        f.write(b)
   
    def load(self,f,id):
        f.seek(id*person.n,0)
        b=f.read(person.n)
        a,b,c=person.myStruct.unpack(b)
        self.name=a.decode(encoding="utf8").rstrip("\0x00")
        self.id=b
        self.score=c



path=pathlib.Path("myBinFile.bin")
os.remove(path)
f=path.open(mode="ab")

me=person("mike",42,3.14)
for id in range(50):
    me.id=id
    me.save(f)
f.close()

f=path.open(mode="r+b")
me2=person()
me2.load(f,20)
print(me2)
me2.load(f,5)
print(me2)
f.close()

Page 248

import pathlib
import dataclasses
import csv

@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0

me = person("mike", 42, 3.145)
path = pathlib.Path("myTextFile.csv")

with path.open(mode="wt", newline="") as f:
    peopleWriter = csv.writer(f)
    for i in range(43):
        peopleWriter.writerow([me.name, i, me.score])

with path.open(mode="rt") as f:
    peopleReader = csv.reader(f)
    for row in peopleReader:
        print(row)

Page 251

import pathlib
import dataclasses
import csv

@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0
    def values(self):
        return [self.name,str(self.id),str(self.score)]
   
class MyCSV(csv.Dialect):
    quoting =csv.QUOTE_NONNUMERIC
    delimiter      = ','
    quotechar      = '"'
    lineterminator = '\r\n'

me=person("mike",42,3.145)
path=pathlib.Path("myTextFile.csv")

with path.open(mode="wt",newline="") as f:
    peopleWriter=csv.writer(f,dialect=MyCSV)
    for i in range(43):
        peopleWriter.writerow([me.name,i,me.score])
       
with path.open(mode="rt") as f:
    peopleReader=csv.reader(f,dialect=MyCSV)
    for row in peopleReader:
        print(row)

Page 256

import pathlib
import dataclasses
import json

@dataclasses.dataclass
class person:
    name:str=""
    id:int=0
    score:float=0.0
   

me=person("mike",42,3.145)

path=pathlib.Path("myTextFile.json")

with path.open(mode="wt") as f:
    s=json.dumps(dataclasses.asdict(me))
    print(s,file=f)
       
with path.open(mode="rt") as f:
    for line in f:
        data= json.loads(line)
        me1=person(**data)
        print(me1)

Page 260

from ast import parse
import pathlib
import xml.etree.ElementTree

data="""
<Books>
    <Book rating="5">
First book on Python
        <Title>
Programmer's Python: Everything is an Object
        </Title>
        <Author>
Mike James
        </Author >
        <Publisher >
IO Press
        </Publisher>
    </Book>
    <Book rating="5">
        <Title>
The Programmer’s Guide To Theory
        </Title>
        <Author>
Mike James
        </Author >
        <Publisher >
IO Press
        </Publisher>
    </Book>
</Books>
"""

path=pathlib.Path("myTextFile.xml")
rootElement=xml.etree.ElementTree.fromstring(data)
tree=xml.etree.ElementTree.ElementTree(rootElement)
tree.write(path)

Page 265

import pathlib
import pickle

path=pathlib.Path("myBinaryFile.pickle")
data=[1,2,{"name":"mike","id":42,"score":3.145}]
with path.open(mode="wb") as f:
    pickle.dump(data,f)
with path.open(mode="rb") as f:
    print(pickle.load(f))
import pathlib
import dataclasses
import pickle

@dataclasses.dataclass
class Person:
    name:str=""
    id:int=0
    score:float=0.0
    def display(self):
        print(self.name)

me=Person("mike",42,3.14)
path=pathlib.Path("myBinaryFile.pickle")
with path.open(mode="wb") as f:
    pickle.dump(me,f)
with path.open(mode="rb") as f:
    me2=pickle.load(f)
me2.display()

Page 268

import pathlib
import pickle

class readFile():
    def __init__(self):
        self.path=pathlib.Path("myFile.txt")
        self.f= self.path.open(mode="rt")
        self.pos=self.f.tell()
    def get(self):
        return self.f.readline()  
    def __getstate__(self):
        self.pos=self.f.tell()
        state = self.__dict__.copy()
        del state['f']
        return state
    def __setstate__(self, state):
        self.__dict__.update(state)
        self.f= self.path.open(mode="rt")
        self.f.seek(self.pos)  
 
read1=readFile()
print(read1.get())

path=pathlib.Path("myTemp.pickle")
with path.open(mode="wb") as f:
    pickle.dump(read1,f)

with path.open(mode="rb") as f:
    read2=pickle.load(f)
print(read2.get())

Page 287

from math import log2
import numbers
import collections.abc


class Tree(collections.abc.Sequence):
 
    class Node:
        def __init__(self,value):
            self.left=None
            self.right=None
            self.value=value

    def __init__(self,value):
        self.root=Tree.Node(value)
        self.len=1
         
 

    def appendBreathFirst(self,value):
        queue=[self.root]
        while len(queue)>0:
            node=queue.pop(0)
            if node.left is None:
                node.left=Tree.Node(value)
                self.len+=1
                return node.left
            queue.append(node.left)
            if node.right is None:
                node.right=Tree.Node(value)
                self.len+=1
                return node.right
            queue.append(node.right)

    def getNode(self,key):
        if key>=len(self) or key<0:
            raise IndexError('Index out of range')
        if key==0:
            return self.root
        row=int(log2(key+1))
        currentNode=self.root
        for r in range(1,row+1):
            k=int((key+1)/2**(row-r)-1)
            if k%2 :
                currentNode=currentNode.left
            else:
                currentNode=currentNode.right
        return currentNode

    def setNode(self,key,node):
        row=int(log2(key+1))
        currentNode=self.root
       
        for r in range(1,row):
            f=2**(row-r)
            k=int((key+1- f)/f)
           
            if k%2 :
                currentNode=currentNode.left
            else:
                currentNode=currentNode.right
        if key%2 :
            currentNode.left=node
        else:
            currentNode.right=node
   
    def __len__(self):
        return self.len

    def __getitem__(self,key):
        if isinstance(key,slice):
            temp=[]
            for i in range(0,len(self))[key]:
                temp.append(self.getNode(i))
            return temp
        if isinstance(key,int):

            return self.getNode(key)
        raise TypeError("invalid index")
                         
    def __setitem__(self,key,value):
        if isinstance(key,slice) and isinstance(value,list):
            for i in range(0,len(self))[key]:
                self.getNode(i).value=value.pop(0)
        elif isinstance(key,int):
            self.getNode(key).value=value
        else:
            raise TypeError("invalid index or value")    
   
    def __imul__(self,m):
        if isinstance(m,numbers.Number):
            for i in range(0,len(self)):
                self.getNode(i).value*= m
        return self

    def __eq__(self,otherTree):
        if not isinstance(otherTree,Tree):
            return NotImplemented

        if len(self)!=len(otherTree):
            return False
        for i in range(0,len(self)):
                if self.getNode(i).value!= otherTree.getNode(i).value:
                    return False
        return True
   
    def __hash__(self):
        temp=[]
        for i in range(0,len(self)):
            temp.append(self.getNode(i).value)
        return hash(tuple(temp))

Program Listings

Programmer’s Python: Async

pythonAsync360

 

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into Thonny or VS Code project. 

The only downside is that you have to create a project to paste the code into.

To do this follow the instructions in the book.

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 49

import multiprocessing


def myProcess():
    print("Hello Process World")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=myProcess)
    p1.start()

 

Page 52

import multiprocessing


def myProcess():
    while True:
        pass


if __name__ == '__main__':
    p0 = multiprocessing.current_process()
    p1 = multiprocessing.Process(target=myProcess, daemon=True)
    p2 = multiprocessing.Process(target=myProcess, daemon=False)
    p1.start()
    p2.start()
    print(p0.pid)
    print(p1.pid)
    print(p2.pid)
    print("ending")

Page 55

import multiprocessing
import multiprocessing.connection
import random
import time


def myProcess():
    time.sleep(random.randrange(1, 4))


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=myProcess)
    p2 = multiprocessing.Process(target=myProcess)
    p3 = multiprocessing.Process(target=myProcess)
    p1.start()
    p2.start()
    p3.start()

    waitList = [p1.sentinel, p2.sentinel, p3.sentinel]
    res = multiprocessing.connection.wait(waitList)
    print(res)
    print(waitList.index(res[0])+1)

Page 57

import time
import multiprocessing


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    print(4*pi)


if __name__ == '__main__':
    N = 10000000
    p1 = multiprocessing.Process(target=myPi, args=(N//4+1, N//4*2))
    p2 = multiprocessing.Process(target=myPi, args=(N//4*2+1, N//4*3))
    p3 = multiprocessing.Process(target=myPi, args=(N//4*3+1, N))

    t1 = time.perf_counter()
    p1.start()
    p2.start()
    p3.start()
    myPi(1, N//4)
    p1.join()
    p2.join()
    p3.join()
    t2 = time.perf_counter()
    print((t2-t1)*1000)

Page 64

import time
import multiprocessing


def myPi(m, n):
    pi = 0
    s = -1
    for k in range(m, n):
        s = -1*s
        pi += s / (2 * k - 1)
    print(4*pi)


if __name__ == '__main__':
    import multiprocessing
    import multiprocessing.connection
    import time
    N = 1000000
    p1 = multiprocessing.Process(target=myPi, args=(N//2+1, N))
    t1 = time.perf_counter()
    p1.start()
    myPi(1, N//2)
    p1.join()
    t2 = time.perf_counter()
    print((t2-t1)*1000)

Page 72

import threading


def myThread():
    while True:
        pass


t0 = threading.main_thread()
t1 = threading.Thread(target=myThread, daemon=True)
t2 = threading.Thread(target=myThread, daemon=False)
t1.start()
t2.start()
print(t0.native_id)
print(t1.native_id)
print(t2.native_id)
print("ending")

Page 74 

import threading


def myThread(sym):
    temp = sym
    while True:
        print(temp)


t1 = threading.Thread(target=myThread, args=("A",))
t2 = threading.Thread(target=myThread, args=("B",))
t1.start()
t2.start()
t1.join()

Page 75
import threading


def myThread(sym):
    myThread.temp = sym
    while True:
        print(myThread.temp)


myThread.temp = ""

t1 = threading.Thread(target=myThread, args=("A",))
t2 = threading.Thread(target=myThread, args=("B",))
t1.start()
t2.start()
t1.join()
Page 76
import threading


class MyClass():
    temp = ""


def myThread(sym):
    myObject = MyClass()
    myObject.temp = sym
    while True:
        print(myObject.temp)


t1 = threading.Thread(target=myThread, args=("A",))
t2 = threading.Thread(target=myThread, args=("B",))
t1.start()
t2.start()
t1.join()

Page 77

import threading

myObject = threading.local()


def myThread(sym):
    myObject.temp = sym
    while True:
        print(myObject.temp)


t1 = threading.Thread(target=myThread, args=("A",))
t2 = threading.Thread(target=myThread, args=("B",))
t1.start()
t2.start()
t1.join()
Page 78

 

import threading
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    print(4*pi)


N = 10000000
thread1 = threading.Thread(target=myPi, args=(N//2+1, N))
t1 = time.perf_counter()
thread1.start()
myPi(1, N//2)
thread1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
 
Page 79
import urllib.request
import time


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')


t1 = time.perf_counter()
download()
download()
t2 = time.perf_counter()
print((t2-t1)*1000)
Page 80
import urllib.request
import threading
import time


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')


thread1 = threading.Thread(target=download)
t1 = time.perf_counter()
thread1.start()
download()
thread1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
Page 81
import threading
import time


def test():
    while (True):
        time.sleep(0)
        pass


thread1 = threading.Thread(target=test)
thread2 = threading.Thread(target=test)
thread1.start()
thread2.start()
thread1.join()

Page 87 

from cmath import sqrt
import threading
import time

myCounter = 0


def count():
    global myCounter
    for i in range(100000):
        temp = myCounter+1
        x = sqrt(2)
        myCounter = temp


thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()

thread1.start()
thread2.start()
thread1.join()
thread2.join()

t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)

 

Page 90 (top)
from cmath import sqrt
import threading
import time

myCounter = 0
countlock = threading.Lock()


def count():
    global myCounter
    for i in range(100000):
        countlock.acquire()
        temp = myCounter+1
        x = sqrt(2)
        myCounter = temp
        countlock.release()


thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()

thread1.start()
thread2.start()
thread1.join()
thread2.join()

t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
 
Page 90 (Bottom)
from cmath import sqrt
import threading
import time

myCounter = 0
countlock = threading.Lock()


def count():
    global myCounter
    countlock.acquire()
    for i in range(100000):
        temp = myCounter+1
        x = sqrt(2)
        myCounter = temp
    countlock.release()


thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()

thread1.start()
thread2.start()
thread1.join()
thread2.join()

t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
Page 91
import multiprocessing
import time


def myProcess(lock):
    lock.acquire()
    time.sleep(4)
    lock.release()


if __name__ == '__main__':
    myLock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=myProcess, args=(myLock,))
    p2 = multiprocessing.Process(target=myProcess, args=(myLock,))
    t1 = time.perf_counter()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    t2 = time.perf_counter()
    print((t2-t1)*1000)
Page 92-93

import threading
import time

countlock1 = threading.Lock()
countlock2 = threading.Lock()
myCounter1 = 0
myCounter2 = 0


def count1():
    global myCounter1
    global myCounter2
    for i in range(100000):
        countlock1.acquire()
        myCounter1 += 1
        countlock2.acquire()
        myCounter2 += 1
        countlock2.release()
        countlock1.release()


def count2():
    global myCounter1
    global myCounter2
    for i in range(100000):
        countlock2.acquire()
        myCounter2 += 1
        countlock1.acquire()
        myCounter1 += 1
        countlock1.release()
        countlock2.release()


thread1 = threading.Thread(target=count1)
thread2 = threading.Thread(target=count2)
t1 = time.perf_counter()

thread1.start()
thread2.start()
thread1.join()
thread2.join()

t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter1)

Page 95

import threading
import time

countlock1 = threading.Lock()
countlock2 = threading.Lock()
myCounter1 = 0
myCounter2 = 0


def count1():
    global myCounter1
    global myCounter2
    for i in range(100000):
        countlock1.acquire()
        myCounter1 += 1
        if countlock2.acquire(timeout=0.01):
            myCounter2 += 1
            countlock2.release()
        countlock1.release()


def count2():
    global myCounter1
    global myCounter2
    for i in range(100000):
        countlock2.acquire()
        myCounter2 += 1
        if countlock1.acquire(timeout=0.01):
            myCounter1 += 1
            countlock1.release()
        countlock2.release()


thread1 = threading.Thread(target=count1)
thread2 = threading.Thread(target=count2)
t1 = time.perf_counter()

thread1.start()
thread2.start()
thread1.join()
thread2.join()

t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter1)

Page 105

import threading
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    global PI
    with myLock:
        PI += pi*4


PI = 0
N = 10000000
myLock = threading.Lock()
thread1 = threading.Thread(target=myPi, args=(N//2+1, N))
t1 = time.perf_counter()
thread1.start()
myPi(1, N//2)
thread1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI)

time.perf_counter()

print((t2-t1)*1000)
print(PI)

Page 107

 
import threading
import time


def myThread1():
    time.sleep(1)
    thread2.join()


def myThread2():
    time.sleep(1)
    thread1.join()


thread1 = threading.Thread(target=myThread1)
thread2 = threading.Thread(target=myThread2)
thread1.start()
thread2.start()

Page 108

 
import threading
import time

joinLock = threading.Lock()


def myThread1():
    time.sleep(1)
    joinLock.release()


thread1 = threading.Thread(target=myThread1)
joinLock.acquire()
thread1.start()
joinLock.acquire()

Page 109

 
import urllib.request
import threading


def download(html, lock):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    lock.release()


html1 = []
html2 = []
mySem = threading.Semaphore()

thread1 = threading.Thread(target=download, args=(html1, mySem))
thread2 = threading.Thread(target=download, args=(html2, mySem))


mySem.acquire()
thread1.start()
thread2.start()
mySem.acquire()

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")

Page 111

 
import urllib.request
import threading


def download(html, event):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    event.set()


html1 = []
html2 = []
myEvent = threading.Event()

thread1 = threading.Thread(target=download, args=(html1, myEvent))
thread2 = threading.Thread(target=download, args=(html2, myEvent))


myEvent.clear()
thread1.start()
thread2.start()
myEvent.wait()

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")

Page 112

 
import urllib.request
import threading


def download(html, barrier):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    barrier.wait()


html1 = []
html2 = []
myBarrier = threading.Barrier(3)

thread1 = threading.Thread(target=download, args=(html1, myBarrier))
thread2 = threading.Thread(target=download, args=(html2, myBarrier))

thread1.start()
thread2.start()
myBarrier.wait()

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")

Page 113

 
import urllib.request
import threading


def download(html, barrier):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    try:
        barrier.wait()
    except:
        pass


html1 = []
html2 = []
myBarrier = threading.Barrier(2)

thread1 = threading.Thread(target=download, args=(html1, myBarrier))
thread2 = threading.Thread(target=download, args=(html2, myBarrier))

thread1.start()
thread2.start()
myBarrier.wait()
myBarrier.abort()

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")

Page 116

 
import urllib.request
import threading


def download(html, condition):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    with condition:
        condition.notify()


html1 = []
html2 = []

myCondition = threading.Condition()

thread1 = threading.Thread(target=download, args=(html1, myCondition))
thread2 = threading.Thread(target=download, args=(html2, myCondition))

thread1.start()
thread2.start()
with myCondition:
    myCondition.wait()

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")

Page 118

 
import urllib.request
import threading


def download(html, condition):
    with urllib.request.urlopen('http://www.example.com/') as f:
        html.append(f.read().decode('utf-8'))
    global myCount
    with condition:
        myCount += 1
        condition.notify()


myCount = 0

html1 = []
html2 = []
html3 = []

myCondition = threading.Condition(threading.Lock())

thread1 = threading.Thread(target=download, args=(html1, myCondition))
thread2 = threading.Thread(target=download, args=(html2, myCondition))
thread3 = threading.Thread(target=download, args=(html3, myCondition))
thread1.start()
thread2.start()
thread3.start()
with myCondition:
    myCondition.wait_for(lambda: myCount >= 2)

if (html1 != []):
    print("thread1")
if (html2 != []):
    print("thread2")
if (html3 != []):
    print("thread3")

Page 125

import multiprocessing


def addCount(q):
    for i in range(1000):
        q.put(i)


def getCount(q, l):
    while True:
        i = q.get()
        with l:
            print(i)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    pLock = multiprocessing.Lock()

    p1 = multiprocessing.Process(target=addCount, args=(q,))
    p2 = multiprocessing.Process(target=getCount, args=(q, pLock))
    p3 = multiprocessing.Process(target=getCount, args=(q, pLock))
    p1.start()
    p2.start()
    p3.start()

    p3.join()

Page 128-129

import multiprocessing
from time import sleep


def addCount(con):
    for i in range(500):
        con.send(i)


def getCount(con, l):
    while True:
        sleep(0.005)
        i = con.recv()
        with l:
            print(i, multiprocessing.current_process().name)


if __name__ == '__main__':

    con1, con2 = multiprocessing.Pipe()
    pLock = multiprocessing.Lock()

    p1 = multiprocessing.Process(target=addCount, args=(con1,))
    p2 = multiprocessing.Process(target=getCount, args=(con2, pLock))
    p3 = multiprocessing.Process(target=getCount, args=(con2, pLock))
    p1.start()
    p2.start()
    p3.start()

    p2.join()

Page 131

import time
import multiprocessing
import ctypes


def myUpdate(val):
    for i in range(1000):
        time.sleep(0.005)
        val.value += 1


if __name__ == '__main__':
    myValue = multiprocessing.Value(ctypes.c_int, 0)
    p1 = multiprocessing.Process(target=myUpdate, args=(myValue,))
    p2 = multiprocessing.Process(target=myUpdate, args=(myValue,))
    p2.start()
    p1.start()
    p1.join()
    p2.join()
    print(myValue.value)

Page 135 

import time
import multiprocessing
import multiprocessing.shared_memory


def myUpdate(name, lock):
    mySharedMem = multiprocessing.shared_memory.SharedMemory(
        create=False, name=name)
    for i in range(127):
        time.sleep(0.005)
        with lock:
            mySharedMem.buf[0] = mySharedMem.buf[0]+1
    mySharedMem.close()


if __name__ == '__main__':
    mylock = multiprocessing.Lock()
    mySharedMem = multiprocessing.shared_memory.SharedMemory(
        create=True, size=10)
    mySharedMem.buf[0] = 1
    p1 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedMem.name, mylock))
    p2 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedMem.name, mylock))
    p2.start()
    p1.start()
    p1.join()
    p2.join()
    print(mySharedMem.buf[0])
    mySharedMem.close()
    mySharedMem.unlink()

Page 136

 
import time
import multiprocessing
import multiprocessing.shared_memory
import sys


def myUpdate(mySharedMem, lock):
    for i in range(1000):
        time.sleep(0.005)
        with lock:
            count = int.from_bytes(
                mySharedMem.buf[0:8], byteorder=sys.byteorder)
            count = count+1
            mySharedMem.buf[0:8] = int.to_bytes(
                count, 8, byteorder=sys.byteorder)
    mySharedMem.close()


if __name__ == '__main__':
    mylock = multiprocessing.Lock()
    mySharedMem = multiprocessing.shared_memory.SharedMemory(
        create=True, size=10)
    mySharedMem.buf[0:8] = int.to_bytes(1, 8,
                                        byteorder=sys.byteorder)
    p1 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedMem, mylock))
    p2 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedMem, mylock))
    p2.start()
    p1.start()
    p1.join()
    p2.join()
    count = count = int.from_bytes(mySharedMem.buf[0:8],
                                   byteorder=sys.byteorder)
    print(count)
    mySharedMem.close()
    mySharedMem.unlink()

Page 138

 
import time
import multiprocessing
import multiprocessing.shared_memory


def myUpdate(mySharedList, lock):
    for i in range(1000):
        time.sleep(0.005)
        with lock:
            mySharedList[0] = mySharedList[0]+1
    mySharedList.shm.close()


if __name__ == '__main__':
    mylock = multiprocessing.Lock()
    mySharedList = multiprocessing.shared_memory.ShareableList(
        sequence=[1])
    p1 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedList, mylock))
    p2 = multiprocessing.Process(target=myUpdate,
                                 args=(mySharedList, mylock))
    p2.start()
    p1.start()
    p1.join()
    p2.join()
    print(mySharedList[0])
    mySharedList.shm.close()
    mySharedList.shm.unlink()

Page 139

 import time
import multiprocessing
import multiprocessing.shared_memory
import multiprocessing.managers


def myUpdate(mySharedList, lock):
    for i in range(1000):
        time.sleep(0.005)
        with lock:
            mySharedList[0] = mySharedList[0]+1
    mySharedList.shm.close()


if __name__ == '__main__':
    mylock = multiprocessing.Lock()
    with multiprocessing.managers.SharedMemoryManager() as smm:
        mySharedList = smm.ShareableList([1])
        p1 = multiprocessing.Process(target=myUpdate,
                                     args=(mySharedList, mylock))
        p2 = multiprocessing.Process(target=myUpdate,
                                     args=(mySharedList, mylock))
        p2.start()
        p1.start()
        p1.join()
        p2.join()
        print(mySharedList[0])

Page 139

 
import ctypes
import multiprocessing
import time


def myPi(m, n, PI):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    with PI:
        PI.value += pi*4


if __name__ == '__main__':
    N = 10000000
    PI = multiprocessing.Value(ctypes.c_double, 0.0)
    p1 = multiprocessing.Process(target=myPi, args=(N//2+1, N, PI))
    t1 = time.perf_counter()
    p1.start()
    myPi(1, N//2, PI)
    p1.join()
    t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(PI.value)

Page 144

 
import multiprocessing.pool
import multiprocessing
import random
import time


def myProcess():
    time.sleep(random.randrange(1, 4))
    print(multiprocessing.current_process().name)


if __name__ == '__main__':
    p = multiprocessing.pool.Pool(2)
    p.apply_async(myProcess)
    p.apply_async(myProcess)
    p.close()
    p.join()

Page 145 

import multiprocessing.pool
import multiprocessing
import random
import time


def myProcess():
    time.sleep(random.randrange(1, 4))
    return multiprocessing.current_process().name


def myCallback(result):
    print(result)


if __name__ == '__main__':
    p = multiprocessing.pool.Pool(2)
    p.apply_async(myProcess, callback=myCallback)
    p.apply_async(myProcess, callback=myCallback)
    p.close()
    p.join()

Page 146 

import multiprocessing.pool
import multiprocessing
import random
import time


def myProcess():
    time.sleep(random.randrange(1, 4))
    return multiprocessing.current_process().name


if __name__ == '__main__':
    with multiprocessing.pool.Pool(2) as p:
        asr1 = p.apply_async(myProcess)
        asr2 = p.apply_async(myProcess)
        result1 = asr1.get()
        result2 = asr2.get()
        print(result1, result2)

Page 147

import multiprocessing.pool
import multiprocessing
import random
import time


def myProcess():
    time.sleep(random.randrange(1, 6))
    return multiprocessing.current_process().name


if __name__ == '__main__':
    with multiprocessing.pool.Pool(2) as p:
        asr1 = p.apply_async(myProcess)
        asr2 = p.apply_async(myProcess)

        waiting = True
        while waiting:
            time.sleep(0.01)
            if (asr1.ready()):
                print(asr1.get())
                break
            if (asr2.ready()):
                print(asr2.get())
                break

Page 147-148

 
import multiprocessing
import multiprocessing.pool
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return pi*4


if __name__ == '__main__':
    N = 10000000
    with multiprocessing.pool.Pool(2) as p:
        t1 = time.perf_counter()
        asr1 = p.apply_async(myPi, args=(1, N//2))
        asr2 = p.apply_async(myPi, args=(N//2+1, N))
        PI = asr1.get()
        PI += asr2.get()
        t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(PI)

Page 149 (top)

 
import multiprocessing
import multiprocessing.pool


def myFunc(x):
    return x**2


if __name__ == '__main__':
    with multiprocessing.pool.Pool(10) as p:
        asr = p.map_async(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        res = asr.get()

    print(res)

Page 149 (bottom)

 
import multiprocessing
import multiprocessing.pool
import time


def myPi(r):
    m, n = r
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return pi*4


if __name__ == '__main__':
    N = 10000000
    with multiprocessing.pool.Pool(2) as p:
        t1 = time.perf_counter()
        asr = p.map_async(myPi, [(1, N//2), (N//2+1, N)])
        res = asr.get()
        t2 = time.perf_counter()

    print((t2-t1)*1000)
    PI = res[0]+res[1]
    print(PI)

Page 150

 
import multiprocessing
import multiprocessing.pool
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return pi*4


if __name__ == '__main__':
    N = 10000000
    with multiprocessing.pool.Pool(2) as p:
        t1 = time.perf_counter()
        asr = p.starmap_async(myPi, [(1, N//2), (N//2+1, N)])
        res = asr.get()
        t2 = time.perf_counter()

    print((t2-t1)*1000)
    PI = res[0]+res[1]
    print(PI)

Page 151

 
import multiprocessing
import multiprocessing.pool


def myFunc(x):
    return x**2


if __name__ == '__main__':
    with multiprocessing.pool.Pool(10) as p:
        results = p.imap(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        for result in results:
            print(result)

Page 152

import multiprocessing
import multiprocessing.pool


def myFunc(x):
    return x**2


if __name__ == '__main__':
    with multiprocessing.pool.Pool(10) as p:
        results = p.imap_unordered(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        for result in results:
            print(result)

Page 153

import functools
import multiprocessing
import multiprocessing.pool


def myFunc(x):
    return x**2


def mySum(value, item):
    return value+item


if __name__ == '__main__':
    with multiprocessing.pool.Pool(10) as p:
        asr = p.map_async(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        res = asr.get()
    sumsquares = functools.reduce(mySum, res)
    print(sumsquares)

Page 154

import multiprocessing
import multiprocessing.pool
import time
import ctypes


def counter():
    global count
    for i in range(10000):
        with count:
            temp = count.value+1
            count.value = temp


def setup(var):
    global count
    count = var


if __name__ == '__main__':
    myCounter = multiprocessing.Value(ctypes.c_int, 0)
    with multiprocessing.Pool(2, initializer=setup,
                              initargs=(myCounter,)) as p:

        t1 = time.perf_counter()
        asr1 = p.apply_async(counter)
        asr2 = p.apply_async(counter)
        asr1.wait()
        asr2.wait()
        t2 = time.perf_counter()
        print(myCounter.value)
    print((t2-t1)*1000)

Page 160

import multiprocessing
import multiprocessing.pool
import time


def myFunc(x):
    for i in range(len(x)):
        x[i] = x[i]**2


if __name__ == '__main__':
    man = multiprocessing.Manager()
    myList = man.list([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    p = multiprocessing.Process(target=myFunc, args=(myList,))
    t1 = time.perf_counter()
    p.start()
    p.join()
    t2 = time.perf_counter()

    print((t2-t1)*1000)
    print(myList)

Page 161

import multiprocessing


def count(myCounter):
    for i in range(1000):
        myCounter.value = myCounter.value+1


if __name__ == '__main__':
    man = multiprocessing.Manager()
    counter = man.Value(int, 0)

    p1 = multiprocessing.Process(target=count, args=(counter,))
    p2 = multiprocessing.Process(target=count, args=(counter,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(counter.value)

Page 162

import multiprocessing


def count(myCounter):
    for i in range(1000):
        myCounter.value = myCounter.value+1


if __name__ == '__main__':
    man = multiprocessing.Manager()
    counter = man.Value(int, 0)

    p1 = multiprocessing.Process(target=count, args=(counter,))
    p2 = multiprocessing.Process(target=count, args=(counter,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(counter.value)

Page 162

import multiprocessing
import time


def myPi(m, n, PI, lock):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    with lock:
        PI.value += pi*4


if __name__ == '__main__':
    N = 10000000
    man = multiprocessing.Manager()
    PI = man.Value(float, 0.0)
    myLock = man.Lock()
    p1 = multiprocessing.Process(target=myPi,
                                 args=(N//2+1, N, PI, myLock))
    t1 = time.perf_counter()
    p1.start()
    myPi(1, N//2, PI, myLock)
    p1.join()
    t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(PI.value)

Page 165-166

import multiprocessing
import multiprocessing.managers


class Point():
    def __init__(self):
        self._x = 0
        self._y = 0

    def setxy(self, x, y):
        self._x = x
        self._y = y

    def getxy(self):
        return (self._x, self._y)


def myFunc(point):
    point.setxy(42, 43)


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("point", Point)

if __name__ == '__main__':
    man = myManager()
    man.start()
    point = man.point()
    p1 = multiprocessing.Process(target=myFunc, args=(point,))
    p1.start()
    p1.join()
    print(point.getxy())

Page 168-169

import multiprocessing
import multiprocessing.managers


class Counter():
    def __init__(self):
        self.myCounter = 0

    def getCount(self):
        return self.myCounter

    def setCount(self, value):
        self.myCounter = value
    count = property(getCount, setCount)


class CounterProxy(multiprocessing.managers.BaseProxy):
    _exposed_ = ('getCount', 'setCount')

    def get(self):
        return self._callmethod('getCount')

    def set(self, value):
        return self._callmethod('setCount', (value,))
    value = property(get, set)


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("counter", callable=Counter,
                   proxytype=CounterProxy)


def count(myCounter):
    for i in range(1000):
        myCounter.value = myCounter.value+1


if __name__ == '__main__':
    man = myManager()
    man.start()
    counter = man.counter()

    p1 = multiprocessing.Process(target=count, args=(counter,))
    p2 = multiprocessing.Process(target=count, args=(counter,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(counter.value)

Page 172

import multiprocessing
import multiprocessing.managers


class SharedList():
    def __init__(self):
        self.list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    def getData(self):
        return self.list

    def setData(self, value, index):
        self.list[index] = value


class SharedListProxy(multiprocessing.managers.BaseProxy):
    _exposed_ = ('getData', 'setData')

    def getData(self):
        return self._callmethod('getData')

    def setData(self, value, index):
        self._callmethod('setData', (value, index))


_sharedList = SharedList()


def SharedList():
    return _sharedList


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("SharedList", callable=SharedList,
                   proxytype=SharedListProxy)

if __name__ == '__main__':
    man = myManager(address=("192.168.253.14", 50000), authkey=b"password")
    s = man.get_server()
    s.serve_forever()

Page 174

import multiprocessing
import multiprocessing.managers


class SharedListProxy(multiprocessing.managers.BaseProxy):
    _exposed_ = ('getData', 'setData')

    def getData(self):
        return self._callmethod('getData')

    def setData(self, value, index):
        self._callmethod('setData', (value, index))


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("SharedList", proxytype=SharedListProxy)

if __name__ == '__main__':
    man = myManager(address=("192.168.253.14", 50000),
                    authkey=b"password")
    man.connect()
    myList = man.SharedList()
    print(myList.getData())

Page 174-175

 
import multiprocessing
import multiprocessing.managers


class Math:
    def myPi(self, m, n):
        pi = 0
        for k in range(m, n+1):
            s = 1 if k % 2 else -1
            pi += s / (2 * k - 1)
        return pi*4


_math = Math()


def getMath():
    return _math


class MathProxy(multiprocessing.managers.BaseProxy):
    _exposed_ = ("myPi",)

    def myPi(self, m, n):
        return self._callmethod('myPi', m, n)


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("math", callable=getMath, proxytype=MathProxy)

if __name__ == '__main__':
    man = myManager(address=("192.168.253.14", 50000),
                    authkey=b"password")
    s = man.get_server()
    s.serve_forever()

Page 175

import multiprocessing
import multiprocessing.managers


class MathProxy(multiprocessing.managers.BaseProxy):
    _exposed_ = ('myPi')

    def myPi(self, m, n):
        return self._callmethod('myPi', (m, n))


class myManager(multiprocessing.managers.BaseManager):
    pass


myManager.register("math", proxytype=MathProxy)

if __name__ == '__main__':
    man = myManager(address=("192.168.253.14", 50000), authkey=b"password")
    man.connect()
    math = man.math()
    print(math.myPi(1, 100))

Page 185

for Linux:

#!/bin/sh
echo "user name: "
read name
echo $name

for Windows:

@echo off
echo "User name:"
set /p id=
echo %id%

Top of Page
import subprocess

p = subprocess.Popen(["myscript.bat"],
                     stdout=subprocess.PIPE, stdin=subprocess.PIPE, text=True)
ans = p.stdout.readline()
print("message", ans)
p.stdin.write("mike\n")
ans = p.stdout.readline()
print("message", ans)
as modified at bottom of page
import subprocess

p = subprocess.Popen(["myscript.bat"],
                     stdout=subprocess.PIPE, stdin=subprocess.PIPE, text=True)
ans = p.stdout.readline()
print("message",ans)
p.stdin.write("mike\n")
p.stdin.flush()
ans = p.stdout.readline()
print("message",ans)

Page 186

import subprocess
p = subprocess.Popen(["myscript.bat"], stdout=subprocess.PIPE,
                     stdin=subprocess.PIPE, bufsize=0,
                     universal_newlines=True, text=True)

ans = p.stdout.readline()
print("message", ans)
p.stdin.write("mike\n")

ans = p.stdout.readline()
print("message", ans)

Page 189

import subprocess
import threading


class AsyncReader():
    def __init__(self, stream):
        self._stream = stream
        self._char = ""
        self.t = threading.Thread(target=self._get, daemon=True)
        self.t.start()

    def get(self):
        self.t.join(0.04)
        if self.t.is_alive():
            return ""
        else:
            result = self._char
            self.t = threading.Thread(target=self._get, daemon=True)
            self.t.start()
            return result

    def _get(self):
        self._char = self._stream.read(1)

    def readmessage(self):
        ans = ""
        while True:
            char = self.get()
            if char == "":
                break
            ans += char
        return ans


p = subprocess.Popen(["myscript.bat"],
                     stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=1, universal_newlines=True, text=True)

aRead = AsyncReader(p.stdout)

ans = aRead.readmessage()

print("message", ans)
p.stdin.write("mike\n")

ans = aRead.readmessage()
print("message", ans)

Page 197

import concurrent.futures
import time
import urllib.request


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
    return html


with concurrent.futures.ThreadPoolExecutor() as executor:
    t1 = time.perf_counter()
    f1 = executor.submit(download)
    f2 = executor.submit(download)
    t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(f1.result()[:25])
    print(f2.result()[:25])

Page 198 modified by top

import concurrent.futures
import time
import urllib.request


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
    return html


t1 = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(download)
    f2 = executor.submit(download)

    t2 = time.perf_counter()
    print((t2-t1)*1000)

    res = concurrent.futures.wait([f1, f2],
                                  return_when=concurrent.futures.FIRST_COMPLETED)
    for f in res.done:
        print(f.result()[:25])

Page 198 modified by bottom

import concurrent.futures
import time
import urllib.request


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
    return html


with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(download)
    f2 = executor.submit(download)
    for f in concurrent.futures.as_completed([f1, f2]):
        print(f.result()[:25])

Page 199

import concurrent.futures
import urllib.request


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
    return html


def processDownload(f):
    print(f.result()[:25])


with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(download)
    f2 = executor.submit(download)
    f1.add_done_callback(processDownload)
    f2.add_done_callback(processDownload)
    print("waiting")

Page 200

import concurrent.futures
import urllib.request


def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
    return html


def processDownloadA(f):
    print(f.result()[:25])
    f2 = executor.submit(download)
    f2.add_done_callback(processDownloadB)


def processDownloadB(f):
    print(f.result()[:25])


with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(download)
    f1.add_done_callback(processDownloadA)

    print("waiting")
    while True:
        pass

Page 203-204

import concurrent.futures
import threading
import time
from cmath import sqrt

myCounter = 0
countlock = threading.Lock()


def count():
    global myCounter
    for i in range(100000):
        with countlock:
            temp = myCounter+1
            x = sqrt(2)
            myCounter = temp


with concurrent.futures.ThreadPoolExecutor() as execute:
    t1 = time.perf_counter()
    f1 = execute.submit(count)
    f2 = execute.submit(count)
    concurrent.futures.wait(
        [f1, f2], return_when=concurrent.futures.ALL_COMPLETED)
    t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)

Page 205

import concurrent.futures
import multiprocessing
import time
import ctypes


def counter():
    global count
    for i in range(10000):
        with count:
            temp = count.value+1
            count.value = temp


def setup(var):
    global count
    count = var


if __name__ == '__main__':
    myCounter = multiprocessing.Value(ctypes.c_int, 0)
    with concurrent.futures.ProcessPoolExecutor(2,
                                                initializer=setup, initargs=(myCounter,)) as execute:

        t1 = time.perf_counter()
        f1 = execute.submit(counter)
        f2 = execute.submit(counter)
        concurrent.futures.wait([f1, f2],
                                return_when=concurrent.futures.ALL_COMPLETED)
        t2 = time.perf_counter()
        print(myCounter.value)
    print((t2-t1)*1000)

Page 206

import concurrent.futures
import multiprocessing
import multiprocessing.managers
import time
import ctypes


def counter(count, lock):
    for i in range(10000):
        with lock:
            temp = count.value+1
            count.value = temp


if __name__ == '__main__':

    with multiprocessing.Manager() as man:
        with concurrent.futures.ProcessPoolExecutor(2) as execute:

            myCounter = man.Value(ctypes.c_int, 0)
            myLock = man.Lock()

            t1 = time.perf_counter()
            f1 = execute.submit(counter, myCounter, myLock)
            f2 = execute.submit(counter, myCounter, myLock)
            concurrent.futures.wait([f1, f2],
                                    return_when=concurrent.futures.ALL_COMPLETED)
            t2 = time.perf_counter()
            print(myCounter.value)
    print((t2-t1)*1000)

Page 207

import concurrent.futures
import time


def taskA():
    time.sleep(1)
    ans = f2.result()
    return ans


def taskB():
    time.sleep(1)
    ans = f1.result()
    return ans


with concurrent.futures.ThreadPoolExecutor(2) as execute:
    f1 = execute.submit(taskA)
    f2 = execute.submit(taskB)
    concurrent.futures.wait([f1, f2],
                            return_when=concurrent.futures.ALL_COMPLETED)
    print(f1.result())

Page 208

import concurrent.futures
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return pi*4


if __name__ == '__main__':
    N = 10000000
    with concurrent.futures.ProcessPoolExecutor(2) as execute:
        t1 = time.perf_counter()
        f1 = execute.submit(myPi, 1, N//2)
        f2 = execute.submit(myPi, N//2+1, N)
        PI = f1.result()
        PI += f2.result()
        t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(PI)

Page 216

import asyncio


async def test1(msg):
    print(msg)


async def main():
    await test1("Hello Coroutine World")

asyncio.run(main())

Page 219

import asyncio


async def count(n):
    for i in range(n):
        print(i)
    return n


async def main(myValue):
    t1 = asyncio.create_task(count(10))
    print("Hello Coroutine World")
    await asyncio.sleep(5)
    return myValue

result = asyncio.run(main(42))
print(result)

Page 221

import asyncio


async def test1(msg):
    print(msg)


async def main():
    asyncio.create_task(test1("one"))
    asyncio.create_task(test1("two"))

    await test1("three")
    print("Hello Coroutine World")
    await asyncio.sleep(0)

asyncio.run(main())

Page 225

import asyncio


async def test1(msg):
    print(msg)
    return msg


async def main():
    t1 = asyncio.create_task(test1("one"))
    t2 = asyncio.create_task(test1("two"))
    done, pending = await asyncio.wait([t1, t2],
                                       return_when=asyncio.ALL_COMPLETED)
    for t in done:
        print(t.result())
    print("Hello Coroutine World")
asyncio.run(main())

Page 226

import asyncio


async def test1(msg):
    print(msg)
    return msg


async def main():
    result = await asyncio.gather(test1("one"), test1("two"))
    print(result)
    print("Hello Coroutine World")

asyncio.run(main())

Page 227

import asyncio


async def test1(msg):
    try:
        await asyncio.sleep(0)
    except:
        pass
    print(msg)
    return msg


async def main():
    t1 = asyncio.create_task(test1("one"))
    await asyncio.sleep(0)
    t1.cancel()
    print("Hello Coroutine World")
    await asyncio.sleep(0)

asyncio.run(main())

Page 228 top

import asyncio


async def test(msg):
    print(msg)
    raise Exception("Test exception")


async def main():
    t1 = asyncio.create_task(test("one"))
    try:
        await t1
    except:
        print("an exception has occurred")
    print("Hello Coroutine World")
    await asyncio.sleep(0)

asyncio.run(main())

Page 228 bottom

import asyncio


async def test(msg):
    print(msg)
    raise Exception("Test exception")


async def main():
    t1 = asyncio.create_task(test("one"))

    done, pending = await asyncio.wait([t1])
    print(repr(done.pop().exception()))

    print("Hello Coroutine World")
    await asyncio.sleep(0)

asyncio.run(main())

Page 229

import asyncio


async def count():
    global myCounter
    for i in range(1000):
        temp = myCounter+1
        myCounter = temp


async def main():
    t1 = asyncio.create_task(count())
    t2 = asyncio.create_task(count())
    await asyncio.wait([t1, t2])
    print(myCounter)

myCounter = 0
asyncio.run(main())

Page 230

import asyncio
import asyncio.locks


async def count():
    global myCounter
    global myLock
    for i in range(1000):
        async with myLock:
            temp = myCounter+1
            await asyncio.sleep(0)
            myCounter = temp


async def main():
    t1 = asyncio.create_task(count())
    t2 = asyncio.create_task(count())
    await asyncio.wait([t1, t2])
    print(myCounter)

myCounter = 0
myLock = asyncio.locks.Lock()
asyncio.run(main())

Page 232

import asyncio
import random

idGlobal = 0


async def inner(idparam):
    global idGlobal
    await asyncio.sleep(0)
    if idparam != idGlobal:
        print("error")


async def outer():
    global idGlobal
    id = random.randint(0, 10000)
    idGlobal = id
    await inner(id)


async def main():
    await asyncio.gather(outer(), outer())

asyncio.run(main())

Page 233

import asyncio
import random
import contextvars

idGlobalCtx = contextvars.ContextVar("id")


async def inner(idparam):
    await asyncio.sleep(0)
    print(idparam, idGlobalCtx.get(), end=" ")
    if idparam != idGlobalCtx.get():
        print("error", end="")
    print()


async def outer():
    global idGlobal
    id = random.randint(0, 10000)
    idGlobalCtx.set(id)
    await inner(id)


async def main():
    await asyncio.gather(outer(), outer())

asyncio.run(main())

Page 234

import asyncio
import asyncio.queues


async def addCount(q):
    for i in range(1000):
        await q.put(i)


async def getCount(q, id):
    while True:
        i = await q.get()
        print(id, i)
        await asyncio.sleep(0)


async def main():
    q = asyncio.queues.Queue()
    t1 = asyncio.create_task(addCount(q))
    t2 = asyncio.create_task(getCount(q, "t2"))
    t3 = asyncio.create_task(getCount(q, "t3"))
    await asyncio.wait([t1, t2, t3])

asyncio.run(main())

Page 243

import asyncio
import urllib.parse
import time
import email


def parseHeaders(headers):
    message = email.message_from_string(headers)
    return dict(message.items())


async def download(url):
    url = urllib.parse.urlsplit(url)
    reader, writer = await asyncio.open_connection(
        url.hostname, 443, ssl=True)

    request = (
        f"GET /index.html HTTP/1.1\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )
    writer.write(request.encode('ascii'))

    headers = ""
    line = await reader.readline()

    while True:
        line = await reader.readline()
        line = line.decode('ascii')
        if line == "\r\n":
            break
        headers += line

    headers = parseHeaders(headers)
    length = int(headers["Content-Length"])

    line = await reader.read(length)
    line = line.decode('utf8')
    writer.close()
    await writer.wait_closed()
    return line


async def main():
    start = time.perf_counter()
    results = await asyncio.gather(download('http://www.example.com/'), download('http://www.example.com/'))
    end = time.perf_counter()
    print((end-start)*1000)
    print(results[0][:25])
    print(results[1][:25])


asyncio.run(main())

Page 247

import asyncio
import email
import email.utils


async def handleRequest(reader, writer):
    headers = ""
    while True:
        line = await reader.readline()
        line = line.decode('ascii')
        if line == "\r\n":
            break
        headers += line
    print(headers)
    html = ("<html><head><title>Test Page</title></head><body>"
            "page content"
            "</p></body></html>\r\n")
    headers = ("HTTP/1.1 200 OK\r\n"
               "Content-Type: text/html; charset=UTF-8\r\n"
               "Server:PythonAcyncio\r\n"
               f"Date: {email.utils.formatdate(timeval=None,localtime=False, usegmt=True)}\r\n"
               f"Content-Length:{len(html)}\r\n\r\n"
               )
    data = headers.encode("ascii")+html.encode("utf8")
    writer.write(data)
    await writer.drain()
    await writer.wait_closed()


async def main():
    server = await asyncio.start_server(handleRequest, "", 8080)
    async with server:
        await server.serve_forever()


asyncio.run(main())

Page 250

import urllib.request
import asyncio
import time

async def download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
        return html

async def main():
    t1=time.perf_counter()
    results=await asyncio.gather(download(),download())
    t2=time.perf_counter()
    print((t2-t1)*1000)
    print(results[0][:25])

asyncio.run(main())

Page 251

import urllib.request
import asyncio
import time


def _download():
    with urllib.request.urlopen('http://www.example.com/') as f:
        html = f.read().decode('utf-8')
        return html


async def download():
    return await asyncio.to_thread(_download)


async def main():
    n = 1000
    t1 = time.perf_counter()
    results = await asyncio.gather(*[download() for i in range(n)])
    t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(results[0][:25])

asyncio.run(main())

Page 253

import asyncio
import time


async def tick():
    i = 0
    while True:
        i += 1
        print("TICK", i)
        await asyncio.sleep(0.5)


def _myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return 4*pi


async def myPi(m, n):
    return await asyncio.to_thread(_myPi, m, n)


async def main():
    n = 100
    t1 = time.perf_counter()
    T1 = asyncio.create_task(myPi(1, 10000000))
    T2 = asyncio.create_task(tick())
    done, pending = await asyncio.wait([T1, T2],
                                       return_when=asyncio.FIRST_COMPLETED)
    t2 = time.perf_counter()
    print((t2-t1)*1000)
    print(done.pop().result())

asyncio.run(main())

Page 255

import aiohttp
import asyncio


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://www.example.com") as response:

            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])

            html = await response.text()
            print("Body:", html[:25], "...")

asyncio.run(main())

Page 256

import tkinter

count = 0
root = None
label1 = None


def stopProg(e):
    global root
    root.destroy()


def transfertext(e):
    global root, label1
    global count
    count = count+1
    label1.configure(text=count)


def runGUI():
    global root, label1
    root = tkinter.Tk()
    button1 = tkinter.Button(root, text="Exit")
    button1.pack()
    button1.bind('<Button-1>', stopProg)
    button2 = tkinter.Button(root, text="Click Me")
    button2.pack()
    button2.bind('<Button-1>', transfertext)
    label1 = tkinter.Label(root, text="nothing to say")
    label1.pack()
    root.mainloop()


runGUI()

Page 257

import asyncio
import tkinter

count = 0
root = None
label1 = None
T1 = None


def stopProg(e):
    global T1
    global root
    T1.cancel()
    root.quit()


def transfertext(e):
    global root, label1
    global count
    count = count+1
    label1.configure(text=count)


async def updater(interval):
    global root
    while True:
        root.update()
        await asyncio.sleep(interval)


def setupGUI():
    global root, label1
    root = tkinter.Tk()

    button1 = tkinter.Button(root, text="Exit")
    button1.pack()
    button1.bind('<Button-1>', stopProg)
    button2 = tkinter.Button(root, text="Click Me")
    button2.pack()
    button2.bind('<Button-1>', transfertext)
    label1 = tkinter.Label(root, text="nothing to say")
    label1.pack()


async def tick():
    i = 0
    while True:
        i += 1
        print("TICK", i)
        await asyncio.sleep(0.5)


async def main():
    global T1, root
    root.tk.willdispatch()
    T1 = asyncio.create_task(updater(0.01))
    T2 = asyncio.create_task(tick())
    await asyncio.wait((T1,))

setupGUI()
asyncio.run(main())

Page 259

import asyncio
import tkinter
import threading

count = 0
root = None
label1 = None


def stopProg(e):
    global root
    root.quit()


def transfertext(e):
    global label1
    global count
    count = count+1
    label1.configure(text=count)


def setupGUI():
    global root, label1
    root = tkinter.Tk()
    button1 = tkinter.Button(root, text="Exit")
    button1.pack()
    button1.bind('<Button-1>', stopProg)
    button2 = tkinter.Button(root, text="Click Me")
    button2.pack()
    button2.bind('<Button-1>', transfertext)
    label1 = tkinter.Label(root, text="nothing to say")
    label1.pack()


async def tick():
    i = 0
    while True:
        i += 1
        print("TICK", i)
        await asyncio.sleep(0.5)


def runasyncio():
    asyncio.run(main())


async def main():
    T2 = asyncio.create_task(tick())
    await asyncio.wait((T2,))

setupGUI()
t1 = threading.Thread(target=runasyncio, daemon=True)
t1.start()
root.mainloop()

Page 262

import asyncio


async def main():
    p = await asyncio.create_subprocess_exec("dir", "/",
                                             stdout=asyncio.subprocess.PIPE)
    stdout_data, stderr_data = await p.communicate()
    print(stdout_data)
    print("finished")

asyncio.run(main())

Page 263

import asyncio


async def main():
    p = await asyncio.create_subprocess_exec("./myscript.sh",
                                             stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
    ans = await p.stdout.readline()
    print("message", ans)
    p.stdin.write(b"mike\n")
    ans = await p.stdout.readline()
    print("message", ans)

asyncio.run(main())

Page 264

import asyncio

async def main():
    p = await asyncio.create_subprocess_exec("./myscript.sh",stdout=asyncio.subprocess.PIPE,stdin=asyncio.subprocess.PIPE)
    T=asyncio.create_task(p.stdout.read(100))
    try:
        done,pending=await asyncio.wait([T],timeout= 0.1)
    except asyncio.TimeoutError:
        pass
    print("message",T.result())
    p.stdin.write(b"mike\n")
    ans= await p.stdout.readline()
    print("message",ans)
    await p.wait()

asyncio.run(main())

Page 268

import asyncio


def myFunction(value):
    print(value)


myLoop = asyncio.new_event_loop()
myLoop.call_later(3, myFunction, 43)
t = myLoop.time()+2
myLoop.call_at(t, myFunction, 44)
myLoop.call_soon(myFunction, 42)

myLoop.call_soon(myFunction, 45)
myLoop.run_forever()

Page 269

import asyncio


async def myFunction1(value):
    await asyncio.sleep(0)
    print(value)


async def myFunction2(value):
    print(value)
    await asyncio.sleep(0)

myLoop = asyncio.new_event_loop()
T1 = myLoop.create_task(myFunction1(42))
T2 = myLoop.create_task(myFunction2(43))

myLoop.run_forever()

Page 271

import asyncio
import threading
import time


def myMakeTask(loop, cor, value):
    loop.call_soon_threadsafe(lambda: loop.create_task(cor(value)))


async def myFunction1(value):
    await asyncio.sleep(0)
    print(value)


async def myFunction2(value):
    print(value)
    await asyncio.sleep(0)

myLoop = None


def myThreadLoop():
    global myLoop
    myLoop = asyncio.new_event_loop()
    T1 = myLoop.create_task(myFunction1(42))
    T2 = myLoop.create_task(myFunction2(43))
    myLoop.run_forever()


t = threading.Thread(target=myThreadLoop)
t.start()
time.sleep(0)
while not myLoop:
    pass
myMakeTask(myLoop, myFunction1, 44)
t.join()

Page 272

import asyncio
import concurrent.futures


def myFunction1(value):
    print(value)
    return value


if __name__ == '__main__':
    pool = concurrent.futures.ProcessPoolExecutor()
    myLoop = asyncio.new_event_loop()

    T1 = myLoop.run_in_executor(pool, myFunction1, 42)
    T2 = myLoop.run_in_executor(pool, myFunction1, 43)

    myLoop.run_until_complete(asyncio.wait([T1, T2]))
    print(T1.result())
    print(T2.result())
    pool.shutdown()

Page 273

import asyncio
import concurrent.futures
import time


def myPi(m, n):
    pi = 0
    for k in range(m, n+1):
        s = 1 if k % 2 else -1
        pi += s / (2 * k - 1)
    return pi*4


if __name__ == '__main__':
    pool = concurrent.futures.ProcessPoolExecutor()

    myLoop = asyncio.new_event_loop()

    N = 10000000
    with concurrent.futures.ProcessPoolExecutor() as pool:
        t1 = time.perf_counter()
        T1 = myLoop.run_in_executor(pool, myPi, 1, N//2)
        T2 = myLoop.run_in_executor(pool, myPi, N//2+1, N)
        myLoop.run_until_complete(asyncio.wait([T1, T2]))
        t2 = time.perf_counter()

    PI = T1.result()
    PI += T2.result()
    print((t2-t1)*1000)
    print(PI)

Page 276

import asyncio


async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: asyncio.DatagramProtocol(),
        local_addr=("192.168.253.20", 8080))

    data = b"Hello UDP World"
    transport.sendto(data, addr=("192.168.253.14", 8080))
    transport.close()

asyncio.run(main())

Page 277

import asyncio


class ClientDatagramProtocol(asyncio.DatagramProtocol):
    def datagram_received(self, data, addr):
        message = data.decode("utf8")
        print("Received", message, "from", addr)


async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: ClientDatagramProtocol(),
        local_addr=('192.168.253.14', 8080))
    await asyncio.sleep(1000)
    transport.close()

asyncio.run(main())

Page 279

import asyncio


async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: asyncio.DatagramProtocol(),
        local_addr=("192.168.253.20", 8080), allow_broadcast=True)
    sock = transport.get_extra_info('socket')
    print(sock)
    data = b"Hello UDP World"
    await loop.sock_connect(sock, ("192.168.253.255", 8080))
    await loop.sock_sendall(sock, data)
    transport.close()

asyncio.run(main())

Page 280

import asyncio
import socket


async def main():
    loop = asyncio.get_running_loop()

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

    data = b"Hello UDP World"
    await loop.sock_connect(sock, ("192.168.253.255", 8080))
    await loop.sock_sendall(sock, data)

asyncio.run(main())