#
tokens: 5996/50000 8/8 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE.md
├── pyproject.toml
├── README.md
├── src
│   ├── tuyactl
│   │   ├── __main__.py
│   │   └── cli.py
│   └── tuyad
│       └── __main__.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
*.json
__pycache__


```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# tuyactl

This project provides a command-line interface (`tuyactl`) for controlling Tuya devices. It interacts with a separate Tuya Server (I'm still thinking a better way of doing that).

## Requirements

*   **uv:** A fast and modern Python package installer and runner.  Install it by following the instructions on the [uv documentation site](https://docs.astral.sh/uv/installation/).
*   **Tuya Local Keys:** You will need the local keys for your Tuya devices. Follow the [tinytuya setup wizard](https://github.com/jasonacox/tinytuya#setup-wizard---getting-local-keys) to obtain these.

## Quick Start

1.  **Install `uv`:**

    Follow the official installation instructions on the [uv documentation site](https://docs.astral.sh/uv/installation/). The recommended method is to use the standalone installer, which you can download and run with the following command:

    ```bash
    curl -LsSf https://astral.sh/uv/install.sh | sh
    ```

2.  **Obtain Tuya Local Keys:**

    Follow the [tinytuya setup wizard](https://github.com/jasonacox/tinytuya#setup-wizard---getting-local-keys) to get the local keys for your Tuya devices.  Place the resulting `snapshot.json` file in your home directory (`~`). You can customize the location of this file using environment variables (see below).

3.  **Run the server:**

    ```
    nohup tuyad > tuyad.log 2>&1 &
    ```

3.  **Run `tuyactl`:**

    To see the available commands and options, run:

    ```bash
    tuyactl --help
    ```

    To execute a specific command, use the following syntax:

    ```bash
    tuyactl <command> [options]
    ```

    Replace `<command>` with one of the available commands: `list`, `on`, `off`, `color`, `brightness`, `temperature`, `mode`, `music`.  Use the `--
help` option to see the available options for each command.

    For example, to list all your Tuya devices, run:

    ```bash
    tuyactl list
    ```

## Configuration

*   **`snapshot.json` Location:** You can customize the location of the `snapshot.json` file (containing your Tuya device keys) using environment va
riables. (Details on this to be added later).


```

--------------------------------------------------------------------------------
/LICENSE.md:
--------------------------------------------------------------------------------

```markdown
Copyright (c) 2025 Cabra Lattice (@cabra.lat) 

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

```

--------------------------------------------------------------------------------
/src/tuyactl/cli.py:
--------------------------------------------------------------------------------

```python
from tuyactl.__main__ import main

if __name__ == '__main__':
    main()
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "tuyactl"
version = "0.0.1"
description = "A server and client cli tool for controlling Tuya devices"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
 "colour>=0.1.5",
 "numpy>=2.2.2",
 "quart>=0.20.0",
 "scipy>=1.15.1",
 "tinytuya>=1.16.1",
]
[[project.authors]]
name = "@cabra.lat"
email = "[email protected]"


[project.scripts]
tuyactl = "tuyactl.cli:main"
tuyad   = "tuyad.__main__:main"

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/tuyactl", "src/tuyad"]

```

--------------------------------------------------------------------------------
/src/tuyactl/__main__.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
import requests
import json
import sys
import argparse

BASE_URL = 'http://localhost:5000'

def send_command(command, parameters=None):
    url = f'{BASE_URL}/{command}'
    headers = {'Content-type': 'application/json'}
    try:
        if parameters:
            print(f"Sending command: {url} with parameters {parameters}")
            response = requests.post(url, headers=headers, data=json.dumps(parameters))
        else:
            print(f"Sending command: {url}")
            response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return {"status": "error", "message": str(e)}

def main():
    parser = argparse.ArgumentParser(description='Control Tuya devices.')
    subparsers = parser.add_subparsers(title='commands', dest='command', help='Available commands')

    # Device list command
    device_list_parser = subparsers.add_parser('list', help='List all devices')

    # Turn on command
    on_parser = subparsers.add_parser('on', help='Turn on a device')
    on_parser.add_argument('devices', type=str, nargs='*', help='Device(s) name(s)')
    on_parser.add_argument('--all',   action='store_true', help='Turn on all devices')

    # Turn off command
    off_parser = subparsers.add_parser('off', help='Turn off a device')
    off_parser.add_argument('devices', type=str, nargs='*', help='Device(s) name(s)')
    off_parser.add_argument('--all',   action='store_true', help='Turn off all devices')

    # Set color command
    color_parser = subparsers.add_parser('color', help='Set the color of a device')
    color_parser.add_argument('color',   type=str, help='Color value')
    color_parser.add_argument('devices', type=str, nargs='*', help='Device(s) name(s)')
    color_parser.add_argument('--all',   action='store_true', help='Set color for all devices')

    # Set brightness command
    brightness_parser = subparsers.add_parser('brightness', help='Set the brightness of a device')
    brightness_parser.add_argument('brightness', type=int, help='Brightness value')
    brightness_parser.add_argument('devices', nargs='*', type=str, help='Device(s) name(s)')
    brightness_parser.add_argument('--all',   action='store_true', help='Set brightness for all devices')

    # Set colour temperature command
    colourtemp_parser = subparsers.add_parser('temperature', help='Set the colour temperature of a device')
    colourtemp_parser.add_argument('temperature', type=int,  help='Colour temperature value')
    colourtemp_parser.add_argument('devices', nargs='*', type=str, help='Device(s) name(s)')
    colourtemp_parser.add_argument('--all',   action='store_true', help='Set colour temperature for all devices')

    # Set mode command
    mode_parser = subparsers.add_parser('mode', help='Set the mode of a device')
    mode_parser.add_argument('mode', type=str, choices=['white', 'colour', 'scene', 'music'], help='Mode value')
    mode_parser.add_argument('devices', nargs='*', type=str, help='Device(s) name(s)')
    mode_parser.add_argument('--all',   action='store_true', help='Set mode for all devices')

    # Music sim command
    music_parser = subparsers.add_parser('music', help='Simulate music mode for a device')
    music_parser.add_argument('devices', nargs='*', type=str, help='Device(s) name(s)')
    music_parser.add_argument('--all',   action='store_true', help='Music mode using all devices')
    music_parser.add_argument('--stop',  action='store_true', help='Stop music mode for selected devices')

    args = parser.parse_args()

    parameters = {}
    endpoint = args.command
    if args.command == 'on':
        endpoint = 'turn_on'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        else:
            print("Error: Device(s) name(s) required when not using --all")
            sys.exit(1)

    elif args.command == 'off':
        endpoint = 'turn_off'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        else:
            print("Error: Device(s) name(s) required when not using --all")
            sys.exit(1)

    elif args.command == 'color':
        endpoint = 'set_color'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        if args.color:
            parameters["color"] = args.color

    elif args.command == 'brightness':
        endpoint = 'set_brightness'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        if args.brightness:
            parameters["brightness"] = args.brightness

    elif args.command == 'temperature':
        endpoint = 'set_temperature'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        if args.temperature:
            parameters["temperature"] = args.temperature

    elif args.command == 'mode':
        endpoint = 'set_mode'
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        if args.mode:
            parameters["mode"] = args.mode

    elif args.command == 'music':
        if args.all:
            parameters["all"] = True
        elif args.devices:
            parameters["devices"] = args.devices
        if args.stop:
            parameters["stop"] = args.stop 

    elif endpoint:
        print("Error: Invalid command")

    if endpoint:
        result = send_command(endpoint, parameters if parameters else None)
        if endpoint == 'list':
            for device in result:
                print(device)
            return
        print(json.dumps(result))
        return

    parser.print_help()

if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/src/tuyad/__main__.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
import tinytuya
import json
import os
import struct
import subprocess
import asyncio
import aiofiles
import numpy as np
import time
import random
import logging
from scipy.fft import fft
from scipy.signal import butter, filtfilt
from quart import Quart, request, jsonify
from colour import Color
from collections import deque

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")

DEVICES_FILE = os.environ.get('DEVICES', os.path.expanduser('~/snapshot.json'))
XDG_RUNTIME_DIR = os.environ.get('XDG_RUNTIME_DIR', f'/run/user/{os.getuid()}')
AUDIO_TARGET = os.environ.get('TUYA_MCP_AUDIO_TARGET', 'alsa_output.pci-0000_00_1b.0.analog-stereo.monitor')
CHANNELS    = 2
SAMPLE_RATE = 48000
BUFFER_SIZE = SAMPLE_RATE * CHANNELS # 16 bits LE gives 500ms of data
CHUNK_SIZE  = 1024

app = Quart(__name__)
devices = []
stop_event = asyncio.Event()

async def load_devices():
    global devices
    try:
        async with aiofiles.open(DEVICES_FILE, 'r') as f:
            data = await f.read()
            devices = json.loads(data).get("devices", [])
        logging.info("Devices loaded")
    except FileNotFoundError:
        logging.error(f"Error: {DEVICES_FILE} not found.")
        raise
    except json.JSONDecodeError:
        logging.error(f"Error: Invalid JSON format in {DEVICES_FILE}.")
        raise

async def control_device(device, action, *args, timeout=0.5, retries=1, **kwargs):
    try:
        device_id   = device['id'  ]
        device_name = device['name']
        local_key   = device['key' ]
        ip_address  = device['ip'  ]
        version     = device['ver' ]

        d = tinytuya.BulbDevice(
                device_id,
                ip_address,
                local_key,
                connection_retry_limit=retries,
                connection_timeout=timeout)
        d.set_version(version)
        function = getattr(d, action)
        logging.info(f"Executing {action} on {device_name}: {args} {kwargs}")
        function(*args, **kwargs)
        logging.info(f"Executed {action} on {device_name}")
    except Exception as e:
        logging.error(f"Error controlling {device_name or 'unknown device'}: {e}")
        raise

async def handle_action_over_devices(action, request_data, *args, **kwargs):
    all_devices = request_data.get('all', False)
    devices_names = request_data.get('devices', [])

    kwargs['nowait'] = kwargs.get('nowait',True)

    if not all_devices and not devices_names:
        return jsonify({"status": "error", "message": "At least one device name must be provided"}), 400

    tasks = []
    for device in devices:
        if device.get('name') in devices_names or all_devices:
            tasks.append(asyncio.create_task(control_device(device, action, *args, **kwargs)))

    return jsonify({
        "status": "success",
        "message": f"Action {action} executed over {'all devices' if all_devices else 'devices: '}{','.join(devices_names)}."
    })

def generate_dp27_payload(mode: int,
                          hue: int,
                          saturation: int,
                          value: int,
                          brightness: int = 1000, 
                          white_brightness: int = 0, 
                          temperature: int = 1000) -> str:
    """
    Generate a DP27 payload for Tuya music mode.

    :param mode: 0 for jumping mode, 1 for gradient mode.
    :param hue: Hue value (0-360).
    :param saturation: Saturation (0-1000).
    :param value: Value (0-1000).
    :param brightness: Brightness (0-1000), default is 1000.
    :param white_brightness: White brightness (0-1000), default is 1000.
    :param temperature: Color temperature (0-1000), default is 1000.
    :return: DP27 string payload.
    """
    return f"{mode:01X}{hue:04X}{saturation:04X}{brightness:04X}{white_brightness:04X}{temperature:04X}"

from collections import deque

class AsyncDeque(deque):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._not_empty = asyncio.Condition()  # For signaling when items are added
        self._stopped = False  # To indicate when the deque is stopped

    def __aiter__(self):
        return self

    async def __anext__(self):
        async with self._not_empty:
            while not self and not self._stopped:
                await self._not_empty.wait()  # Wait until an item is added or stopped
            if self._stopped and not self:
                raise StopAsyncIteration
            return self.popleft()

    async def put(self, item):
        """Add an item to the deque and notify waiting consumers."""
        async with self._not_empty:
            self.append(item)
            self._not_empty.notify()

    async def stop(self):
        """Stop all waiting consumers by notifying them."""
        async with self._not_empty:
            self._stopped = True
            self._not_empty.notify_all()

    async def __aenter__(self):
        """Enter context, returning the deque."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit context, ensuring proper cleanup."""
        await self.stop()

class AudioBufferManager:
    def __init__(self):
        self.sample_rate = SAMPLE_RATE
        self.chunk_size  = CHUNK_SIZE 
        self.max_buffer_size = 1
        self.consumers = []
        self.lock = asyncio.Lock()

    async def add_audio_data(self, data):
        """Add new audio data to the buffer and all consumer queues"""
        async with self.lock:
            # Add to all consumer queues
            for queue in self.consumers:
                await queue.put(data)

    def register_consumer(self):
        """Create a new async generator for a consumer"""
        queue = AsyncDeque(maxlen=self.max_buffer_size)
        self.consumers.append(queue)
        return queue

buffer_manager = AudioBufferManager()

async def audio_reader():
    global stop_event, buffer_manager

    process = await asyncio.create_subprocess_exec(
        #'parec', f'--device={AUDIO_TARGET}', '--format=s16', f'--rate={SAMPLE_RATE}',
        'pw-record', f'--target={AUDIO_TARGET}', '--format=s16', f'--rate={SAMPLE_RATE}', '-',
        env={'XDG_RUNTIME_DIR': XDG_RUNTIME_DIR},
        stdout=asyncio.subprocess.PIPE,
    )
    logging.info("Initializing audio reader task.")

    try:
        while not stop_event.is_set():
            chunk = await process.stdout.read(CHUNK_SIZE)
            #if not any(chunk): logging.info('empty chunk')
            await buffer_manager.add_audio_data(chunk)
    except Exception as e:
        logging.error(f"Audio reader error: {e}")
        raise

async def audio_consumer(device, freq_range, *args, timeout=0.5, retries=1, **kwargs):
    global stop_event, buffer_manager
    consumer = buffer_manager.register_consumer()
    device_id   = device['id']
    device_name = device['name']
    local_key   = device['key']
    ip_address  = device['ip']
    version     = float(device['ver'])

    d = tinytuya.BulbDevice(
        device_id,
        ip_address,
        local_key,
        connection_retry_limit=retries,
        connection_timeout=timeout)
    d.set_version(version)
    d.set_socketPersistent(True)
    d.set_mode('music')

    status = d.status()
    error_message = status.get('Error',None)
    if error_message:
        logging.error(f'{device_name} error: {error_message}')
        return

    logging.info(f"{device_name} will handle {freq_range[0]} Hz - {freq_range[1]} Hz")

    # Parameters for beat detection and processing
    BEAT_THRESHOLD_FACTOR = 1.5  #  Base beat detection threshold.  Dynamic adjustment target.
    DYNAMIC_ADJUSTMENT_RATE = 0.01 # How quickly the threshold adapts
    RUNNING_AVG_COEFF = 0.90     # For smoothing the volume (increased responsiveness, was 0.975)
    MIN_BRIGHTNESS = 0  # Ensure this is never negative.
    MAX_BRIGHTNESS = 1000
    MIN_SATURATION = 0
    MAX_SATURATION = 1000
    
    # Beat detection variables
    history_length = int(SAMPLE_RATE / CHUNK_SIZE)  # 1 second history
    volume_history = np.zeros(history_length)
    current_index = 0
    dynamic_threshold = 0
    last_beat_time = 0

    # Color rotation parameters
    sensitivity=1.5       # Beat detection sensitivity (higher = more sensitive)
    decay_rate=0.97       # How quickly peaks decay over time
    flash_duration=0.1    # Duration of light flash in seconds

    hue = 0
    brightness = 0
    saturation = 0

    def calculate_rms(audio_data):
        """Calculate root mean square of audio chunk"""
        return np.sqrt(np.mean(np.square(audio_data)))

    def is_beat(current_volume):
        """Improved beat detection using dynamic threshold"""
        nonlocal dynamic_threshold, last_beat_time
        # Update dynamic threshold
        dynamic_threshold = decay_rate * dynamic_threshold + (1 - decay_rate) * current_volume
        
        # Check if current volume exceeds threshold and cooldown has passed
        now = time.time()
        if current_volume > dynamic_threshold * sensitivity and (now - last_beat_time) > flash_duration:
            last_beat_time = now
            return True
        return False

    async def fade_back(device, duration):
        """Gradual fade back to rhythm-based brightness"""
        start_time = time.time()
        while time.time() - start_time < duration:
            brightness = int(1000 * (1 - (time.time() - start_time)/duration))
            value = generate_dp27_payload(0, hue, saturation, brightness)
            payload = device.generate_payload(tinytuya.CONTROL, {"27": value})
            device.send(payload)
            await asyncio.sleep(0.05)

    async def fft_analysis(np_audio):
        # FFT analysis focused on bass frequencies
        fft_data = np.fft.rfft(np_audio)
        frequencies = np.fft.rfftfreq(len(np_audio), 1/SAMPLE_RATE)
        mask = (frequencies >= freq_range[0]) & (frequencies <= freq_range[1])
        fft_abs = np.abs(fft_data[mask])
        energy = np.sum(fft_abs)
        # Convert to dB scale
        volume_db = 10 * np.log10(energy + 1e-9)  # Add epsilon to avoid log(0)

        max_index = np.argmax(fft_abs)
        dominant_frequency = frequencies[mask][max_index]
        
        return dominant_frequency, volume_db

    try:
        async for audio_chunk in consumer:
            if stop_event.is_set(): break
            np_audio = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)

            frequency, volume = await fft_analysis(np_audio)
            
            hue = int(frequency / freq_range[1] * 360) if frequency > 0 else hue

            # Beat detection
            if is_beat(volume):
                # Flash on beat
                brightness = MAX_BRIGHTNESS
                saturation = MAX_SATURATION
            
                # Schedule return to rhythm-based brightness
                asyncio.create_task(fade_back(d, flash_duration))
            else:
                # Base rhythm mode
                brightness = int(np.clip(volume * 10, 0, 1000))  # Map volume to brightness
                saturation = 1000

            value = generate_dp27_payload(0, hue, saturation, brightness)  # Full brightness
            logging.info(f"Sending {value} (" + \
                         f"H: {hue:03d} " + \
                         f"S: {saturation:04d} " + \
                         f"V: {brightness:04d} " + \
                         f"vol: {volume:10.2f} dB " + \
                         f"freq: {frequency:10.2f} Hz) " + \
                         f"to {device_name}")
            payload = d.generate_payload(tinytuya.CONTROL, {"27": value})
            d.send(payload)

    except Exception as e:
        logging.error(f"Error controlling {device_name or 'unknown device'}: {e}")
        d.set_white()
        raise

async def music_mode(request_data):
    global stop_event

    stop = request_data.get('stop', False)
    all_devices = request_data.get('all', False)
    devices_names = request_data.get('devices', [])
    
    if stop:
        stop_event.set()
        return jsonify({"status": "success", "message": "Music mode stopped"})
    
    stop_event.clear()
    selected_devices = [d for d in devices if all_devices or d.get('name') in devices_names]
    if not selected_devices:
        return jsonify({"status": "error", "message": "No matching devices found"}), 404
    
    minf = 10
    maxf = 20000
    step = (maxf - minf) // len(selected_devices)
    frequency_ranges = [(minf + i * step, minf + (i + 1) * step) for i in range(len(selected_devices))]
    
    asyncio.create_task(audio_reader())

    tasks = []
    for device, freq_range in zip(selected_devices, frequency_ranges):
        task = asyncio.create_task(audio_consumer(device, freq_range))
        tasks.append(task)
    
    return jsonify({"status": "success", "message": "Music mode started"})

@app.route('/list', methods=['GET'])
async def device_list():
    await load_devices()
    return jsonify([{ 'name': d.get('name'), 'id': d['id'], 'ip': d['ip'], 'version': d["ver"] } for d in devices])

@app.route('/turn_on', methods=['POST'])
async def turn_on():
    request_data = await request.get_json()
    return await handle_action_over_devices('turn_on', request_data)

@app.route('/turn_off', methods=['POST'])
async def turn_off():
    request_data = await request.get_json()
    return await handle_action_over_devices('turn_off', request_data)

@app.route('/set_mode', methods=['POST'])
async def set_mode():
    request_data = await request.get_json()
    mode = request_data.get('mode')
    return await handle_action_over_devices('set_mode', request_data, mode)

@app.route('/set_brightness', methods=['POST'])
async def set_brightness():
    request_data = await request.get_json()
    brightness = request_data.get('brightness')
    return await handle_action_over_devices('set_brightness', request_data, brightness)

@app.route('/set_temperature', methods=['POST'])
async def set_temperature():
    request_data = await request.get_json()
    temperature = request_data.get('temperature')
    return await handle_action_over_devices('set_colourtemp', request_data, temperature)

@app.route('/set_color', methods=['POST'])
async def set_color():
    request_data = await request.get_json()
    color_input = request_data.get('color')
    r, g, b = Color(color_input).rgb
    rgb = int(255 * r), int(255 * g), int(255 * b)
    return await handle_action_over_devices('set_colour', request_data, *rgb)

@app.route('/music', methods=['POST'])
async def music():
    request_data = await request.get_json()
    return await music_mode(request_data)

def main():
    asyncio.run(load_devices())
    logging.info("Daemon started")
    app.run(host='0.0.0.0', port=5000, debug=False)

if __name__ == '__main__':
    main()

```