<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
228 lines
8.9 KiB
Python
Executable File
228 lines
8.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Write a PNG image representing Starlink obstruction map data.
|
|
|
|
This scripts queries obstruction map data from the Starlink user terminal
|
|
(dish) reachable on the local network and writes a PNG image based on that
|
|
data.
|
|
|
|
Each pixel in the image represents the signal quality in a particular
|
|
direction, as observed by the dish. If the dish has not communicated with
|
|
satellites located in that direction, the pixel will be the "no data" color;
|
|
otherwise, it will be a color in the range from the "obstructed" color (no
|
|
signal at all) to the "unobstructed" color (sufficient signal quality for full
|
|
signal).
|
|
|
|
The coordinates of the pixels are the altitude and azimuth angles from the
|
|
horizontal coordinate system representation of the sky, converted to Cartesian
|
|
(rectangular) coordinates. The conversion is done in a way that maps all valid
|
|
directions into a circle that touches the edges of the image. Pixels outside
|
|
that circle will show up as "no data".
|
|
|
|
Azimuth is represented as angle from a line drawn from the center of the image
|
|
to the center of the top edge of the image, where center-top is 0 degrees
|
|
(North), the center of the right edge is 90 degrees (East), etc.
|
|
|
|
Altitude (elevation) is represented as distance from the center of the image,
|
|
where the center of the image represents vertical up from the point of view of
|
|
an observer located at the dish (zenith, which is usually not the physical
|
|
direction the dish is pointing) and the further away from the center a pixel
|
|
is, the closer to the horizon it is, down to a minimum altitude angle at the
|
|
edge of the circle.
|
|
"""
|
|
|
|
import argparse
|
|
from datetime import datetime
|
|
import logging
|
|
import os
|
|
import png
|
|
import sys
|
|
import time
|
|
|
|
import starlink_grpc
|
|
|
|
DEFAULT_OBSTRUCTED_COLOR = "FFFF0000"
|
|
DEFAULT_UNOBSTRUCTED_COLOR = "FFFFFFFF"
|
|
DEFAULT_NO_DATA_COLOR = "00000000"
|
|
DEFAULT_OBSTRUCTED_GREYSCALE = "FF00"
|
|
DEFAULT_UNOBSTRUCTED_GREYSCALE = "FFFF"
|
|
DEFAULT_NO_DATA_GREYSCALE = "0000"
|
|
LOOP_TIME_DEFAULT = 0
|
|
|
|
|
|
def loop_body(opts, context):
|
|
try:
|
|
snr_data = starlink_grpc.obstruction_map(context)
|
|
except starlink_grpc.GrpcError as e:
|
|
logging.error("Failed getting obstruction map data: %s", str(e))
|
|
return 1
|
|
|
|
def pixel_bytes(row):
|
|
for point in row:
|
|
if point > 1.0:
|
|
# shouldn't happen, but just in case...
|
|
point = 1.0
|
|
|
|
if point >= 0.0:
|
|
if opts.greyscale:
|
|
yield round(point * opts.unobstructed_color_g +
|
|
(1.0-point) * opts.obstructed_color_g)
|
|
else:
|
|
yield round(point * opts.unobstructed_color_r +
|
|
(1.0-point) * opts.obstructed_color_r)
|
|
yield round(point * opts.unobstructed_color_g +
|
|
(1.0-point) * opts.obstructed_color_g)
|
|
yield round(point * opts.unobstructed_color_b +
|
|
(1.0-point) * opts.obstructed_color_b)
|
|
if not opts.no_alpha:
|
|
yield round(point * opts.unobstructed_color_a +
|
|
(1.0-point) * opts.obstructed_color_a)
|
|
else:
|
|
if opts.greyscale:
|
|
yield opts.no_data_color_g
|
|
else:
|
|
yield opts.no_data_color_r
|
|
yield opts.no_data_color_g
|
|
yield opts.no_data_color_b
|
|
if not opts.no_alpha:
|
|
yield opts.no_data_color_a
|
|
|
|
if opts.filename == "-":
|
|
# Open new stdout file to get binary mode
|
|
out_file = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
|
|
else:
|
|
now = int(time.time())
|
|
filename = opts.filename.replace("%u", str(now))
|
|
filename = filename.replace("%d",
|
|
datetime.utcfromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S"))
|
|
filename = filename.replace("%s", str(opts.sequence))
|
|
out_file = open(filename, "wb")
|
|
if not snr_data or not snr_data[0]:
|
|
logging.error("Invalid SNR map data: Zero-length")
|
|
return 1
|
|
writer = png.Writer(len(snr_data[0]),
|
|
len(snr_data),
|
|
alpha=(not opts.no_alpha),
|
|
greyscale=opts.greyscale)
|
|
writer.write(out_file, (bytes(pixel_bytes(row)) for row in snr_data))
|
|
out_file.close()
|
|
|
|
opts.sequence += 1
|
|
return 0
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description="Collect directional obstruction map data from a Starlink user terminal and "
|
|
"emit it as a PNG image")
|
|
parser.add_argument(
|
|
"filename",
|
|
nargs="?",
|
|
help="The image file to write, or - to write to stdout; may be a template with the "
|
|
"following to be filled in per loop iteration: %%s for sequence number, %%d for UTC date "
|
|
"and time, %%u for seconds since Unix epoch.")
|
|
parser.add_argument(
|
|
"-o",
|
|
"--obstructed-color",
|
|
help="Color of obstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
|
|
DEFAULT_OBSTRUCTED_COLOR + " or " + DEFAULT_OBSTRUCTED_GREYSCALE)
|
|
parser.add_argument(
|
|
"-u",
|
|
"--unobstructed-color",
|
|
help="Color of unobstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
|
|
DEFAULT_UNOBSTRUCTED_COLOR + " or " + DEFAULT_UNOBSTRUCTED_GREYSCALE)
|
|
parser.add_argument(
|
|
"-n",
|
|
"--no-data-color",
|
|
help="Color of areas with no data, in RGB, ARGB, L, or AL hex notation, default: " +
|
|
DEFAULT_NO_DATA_COLOR + " or " + DEFAULT_NO_DATA_GREYSCALE)
|
|
parser.add_argument(
|
|
"-g",
|
|
"--greyscale",
|
|
action="store_true",
|
|
help="Emit a greyscale image instead of the default full color image; greyscale images "
|
|
"use L or AL hex notation for the color options")
|
|
parser.add_argument(
|
|
"-z",
|
|
"--no-alpha",
|
|
action="store_true",
|
|
help="Emit an image without alpha (transparency) channel instead of the default that "
|
|
"includes alpha channel")
|
|
parser.add_argument("-e",
|
|
"--target",
|
|
help="host:port of dish to query, default is the standard IP address "
|
|
"and port (192.168.100.1:9200)")
|
|
parser.add_argument("-t",
|
|
"--loop-interval",
|
|
type=float,
|
|
default=float(LOOP_TIME_DEFAULT),
|
|
help="Loop interval in seconds or 0 for no loop, default: " +
|
|
str(LOOP_TIME_DEFAULT))
|
|
parser.add_argument("-s",
|
|
"--sequence",
|
|
type=int,
|
|
default=1,
|
|
help="Starting sequence number for templatized filenames, default: 1")
|
|
parser.add_argument("-r",
|
|
"--reset",
|
|
action="store_true",
|
|
help="Reset obstruction map data before starting")
|
|
opts = parser.parse_args()
|
|
|
|
if opts.filename is None and not opts.reset:
|
|
parser.error("Must specify a filename unless resetting")
|
|
|
|
if opts.obstructed_color is None:
|
|
opts.obstructed_color = DEFAULT_OBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_OBSTRUCTED_COLOR
|
|
if opts.unobstructed_color is None:
|
|
opts.unobstructed_color = DEFAULT_UNOBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_UNOBSTRUCTED_COLOR
|
|
if opts.no_data_color is None:
|
|
opts.no_data_color = DEFAULT_NO_DATA_GREYSCALE if opts.greyscale else DEFAULT_NO_DATA_COLOR
|
|
|
|
for option in ("obstructed_color", "unobstructed_color", "no_data_color"):
|
|
try:
|
|
color = int(getattr(opts, option), 16)
|
|
if opts.greyscale:
|
|
setattr(opts, option + "_a", (color >> 8) & 255)
|
|
setattr(opts, option + "_g", color & 255)
|
|
else:
|
|
setattr(opts, option + "_a", (color >> 24) & 255)
|
|
setattr(opts, option + "_r", (color >> 16) & 255)
|
|
setattr(opts, option + "_g", (color >> 8) & 255)
|
|
setattr(opts, option + "_b", color & 255)
|
|
except ValueError:
|
|
logging.error("Invalid hex number for %s", option)
|
|
sys.exit(1)
|
|
|
|
return opts
|
|
|
|
|
|
def main():
|
|
opts = parse_args()
|
|
|
|
logging.basicConfig(format="%(levelname)s: %(message)s")
|
|
|
|
context = starlink_grpc.ChannelContext(target=opts.target)
|
|
|
|
try:
|
|
if opts.reset:
|
|
starlink_grpc.reset_obstruction_map(context)
|
|
|
|
if opts.filename is not None:
|
|
next_loop = time.monotonic()
|
|
while True:
|
|
rc = loop_body(opts, context)
|
|
if opts.loop_interval > 0.0:
|
|
now = time.monotonic()
|
|
next_loop = max(next_loop + opts.loop_interval, now)
|
|
time.sleep(next_loop - now)
|
|
else:
|
|
break
|
|
finally:
|
|
context.close()
|
|
|
|
sys.exit(rc)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|