Andrew Que Sites list Photos
Projects Contact
Main

June 30, 2020

Python key handler thread

I've been using Python for a lot of projects at work, and one problem I've had is dealing with the keyboard.  Typically I just want to be able to read a key and trigger some event when that happens.  Just reading a single key press isn't trivial.  There are plenty of write ups about how to change the terminal into single character mode, and that does work.  But what if you don't want the read to block so you can do an event driven system? 

In the past I have simply made a thread that puts the terminal into non-blocking single character mode.  The body of the thread reads the key.  If there isn't one, the thread sleeps.  If there is one, the key is read and dispatched.  The problem with this approach is the sleep time because it limits how fast keys respond.

I tried a different implementation the other day I'm sharing here.  For Linux machines it is more efficient because it is still blocking.  However, rather than block by just waiting on the terminal (stdin) it waits for either the terminal or a temporary file.  This is accomplished through the use of select which can block until an event occurs on one or more files.  That eliminates the need for an arbitrary sleep.

#!/usr/bin/env python3
#------------------------------------------------------------------------------
# Uses: Thread for reading the keyboard and placing characters in a queue.
# Date: 2020-02-09
# Author: Andrew Que <https://www.DrQue.net/>
#
#                              (C) Copyright 2020
#------------------------------------------------------------------------------

# For reading key in non-blocking mode.
import sys
import os

import select
import tempfile

# For thread.
import threading
import time
import queue

class KeyboardThread( threading.Thread ) :
  """
  Thread for handling single character keyboard input.

  Input:
    shutdown - Instance of `threading.Event` used to signal the thread to shutdown.
    keyQueue - Instance of `queue.Queue` where keys that have been pressed are placed.

  Queue:
    New keys are placed in the queue as they arrive.  When the thread shuts down
    `None` is placed in the queue.  This will unblock queue and there is no
    other way for `None` to be enter the queue, thus safe to assume `None` is
    a shutdown event.
  """

  #-----------------------------------------------------------------------------
  def __init__( self, shutdown : threading.Event, keyQueue : queue.Queue, start=False, *args, **kwargs ) :
    super().__init__( *args, **kwargs )
    self._shutdown = shutdown
    self._keyQueue = keyQueue

    # We use a temporary file (a pipe) to shutdown the thread.
    # Writes to the temporary file unblock the thread.  So to end the thread
    # we simply write something to the temp file.
    self._shutdownFile = os.pipe()
    threading.Thread( target=self._stopBody ).start()

    if start :
      self.start()

  #-----------------------------------------------------------------------------
  def _stopBody( self ) :
    """
    This function simply waits for the shutdown even and then unblocks the
    keyboard thread by writing something to the shutdown file.
    """

    # Wait for shutdown.
    self._shutdown.wait()

    # Signal the keyboard task by writing dummy data to the shutdown file.
    os.write( self._shutdownFile[ 1 ], b"\0" )

  #-----------------------------------------------------------------------------
  def run( self ) :
    try :
      self.runLinux()
    except ModuleNotFoundError :
      self.runWindows()

  #-----------------------------------------------------------------------------
  def runLinux( self ) :

    import tty
    import termios

    # File descriptor of stdin.
    # (Likely this will always be 0.)
    fileDescriptor = sys.stdin.fileno()

    # Set terminal in raw mode.
    savedAttributes = termios.tcgetattr( fileDescriptor )
    tty.setcbreak( fileDescriptor )

    try :
      # Wait for data from either `stdin` or the shutdown file.
      readDescriptors = [ fileDescriptor, self._shutdownFile[ 0 ] ]

      while not self._shutdown.is_set() :
        # Wait for a data.
        [ reading, writing, exceptional ] = select.select( readDescriptors, [], [] )

        # If this was a key press...
        if fileDescriptor in reading :
          # Read and queue the new key.
          # (Could be more than one character for the case of special keys)
          character = os.read( fileDescriptor, 8 ).decode()
          self._keyQueue.put( character )

    finally :
      # Restore terminal settings.
      termios.tcsetattr( fileDescriptor, termios.TCSANOW, savedAttributes )

      # Unblock queue by adding `None`.
      self._keyQueue.put( None )

  #-----------------------------------------------------------------------------
  def runWindows( self ) :
    import msvcrt

    while not self._shutdown.is_set() :
      if msvcrt.kbhit() :
        # Read and queue the new key.
        character = msvcrt.getch().decode()

        if "\0" == character :
          secondCharacter = msvcrt.getch().decode()
          character += secondCharacter

        self._keyQueue.put( character )
      else:
        # Come back latter.
        # 100ms is rather arbitrary.
        self._shutdown.wait( 0.1 )

    # Unblock queue by adding `None`.
    self._keyQueue.put( None )


#-------------------------------------------------------------------------------
if __name__ == '__main__':

  import string

  # Escape key value.
  ESC_KEY = chr( 27 )

  # Event to signal shutdown.
  shutdown = threading.Event()

  # Queue to hold keys.
  keyQueue = queue.Queue()

  class KeyUnload( threading.Thread ) :
    def __init__( self, shutdown : threading.Event, keyQueue : queue.Queue ) :
      super().__init__()
      self._shutdown = shutdown
      self._keyQueue = keyQueue

    def run( self ) :
      while not self._shutdown.is_set() :
        key = self._keyQueue.get()
        if ESC_KEY == key :
          print( "Shutdown key pressed" )
          self._shutdown.set()
        elif key is not None :
          if key[ 0 ] == chr( 27 ) :
            print( "Special key code '" + key[ 1: ] + "' pressed" )
          elif "\n" == key :
            print( "Enter key was pressed" )
          elif key.isprintable() :
            print( "The '" + key + "' key was pressed" )
          elif len( key ) > 1 :
            print( key )
          else:
            print( "Character", ord( key ), "received." )

  # Thread to unload key presses.
  keyUnload = KeyUnload( shutdown, keyQueue )
  keyUnload.start()

  # Keyboard thread.
  keyboardThread = KeyboardThread( shutdown, keyQueue )
  keyboardThread.start()

  # Timer to shutdown the example after 3 seconds.
  timer = threading.Timer( 3.0, lambda: shutdown.set() )
  #timer.start()

  # Wait for shutdown.
  print( "Hit ESC to stop, or wait 3 seconds." )
  shutdown.wait()

  print( "Shutdown" )

  # Stop timer.
  timer.cancel()

  # Wait for keyboard thread to finish.
  keyUnload.join()
  keyboardThread.join()

  print( "Done" )

What we have is a thread that takes an Event as a parameter.  The event is used to signal the thread to shutdown, typically when the program ends.  We create a pipe which is just a temporary file for which we can use select.  The body of the thread setups up the terminal for blocking single character mode.  Then it enters a loop that waits for a character from either the terminal or the shutdown file.  If there was a character from the terminal, it is placed in a queue for consumption by some other process.  A second thread is spun up that simply waits for the event to be set, and writes something to the shutdown pipe.  That will cause the main thread to unblock and see that it is time to shutdown.

This efficient process only works for POSIX system.  The class has a Windows version but it uses the less efficient timed out block method.  The whole things seems overly complected, but is completely functional.  Maybe it will help someone else having the same issue.

June 29, 2020

Function for Curving Random Numbers

Your typical Pseudo-Random Number Generator (PRNG) will generate uniform random numbers—.  That is, equal odds of any number.  This is often normalized so the random numbers are between 0 and 1.  There are times we do not wish equal odds, but rather some known odds.  One way to do this is through a mapping function that will curve the numbers.  Here is a simple curving function demo:

 

 
 
Curve position: unknown
Coefficient: unknown
Standard deviation: unknown

 

The circle can be moved around to modify the curve and it also depicts odds at that point.  Try dragging it around.  The curve will always pass through the center of the circle by generating a coefficient.  The plot below the main graph is a histogram, showing the distribution of values.  Unless the distribution is uniform (a coefficient of 1) more values will by on one side as opposed to the other.

For example, if the point is at 0.4, 0.3, that means the 40% of all the random numbers will be below 0.3.  Conversely, 70% are above 0.3.  Since this is a one-to-one map transform we can simply plug in a normalized random number to generate a weighted random result.  Since the values are normalized we can scale it up (by multiplying the result by an amplitude constant) and/or offset it (adding a constant).

The function for the mapping is very simple.

y = x c { x | 0 = x = 1 } { c | 0 < c }

This function will map any x value between 0.0 and 1.0 which is the range of normalized PRNG.  The coefficient is any positive real number above zero.  This allows the output to be weighted more toward smaller or larger numbers.  To use the method of applying a fixed-point (as demonstrated by the circle on the plot), the following equation can be used:

c = ln y ln x

Where x and y are the coordinators of the point on the curve.  This will guarantee a coefficient that  passes through the curve.

2 comments have been made.

From Noah

July 03, 2020 at 1:50 PM

Very interesting! The 1:1 mapping here makes a lot of sense, and I like how easy it is to just normalize across a new range.

I wonder if more complex probabilities could be created the same way - for example a normal distribution? Binomial? Rayleigh? As long as it's continuous there could be a lot of interesting ways to mess with PRNG output, should you need it.

From Andrew Que (http://www.DrQue.net)

Middleton, WI

July 03, 2020 at 3:32 PM

Funny you should say that.  Working on a follow-up article that talks about this.

June 27, 2020

Game Server—My Path Finding Algorithm vs the Original Trade Wars 2002

Today I will explain the results of my A* search algorithm against the original Trade Wars 2002 algorithm. As explained in previous articles, Trade Wars 2002 uses a directed graph to represent the game field. This graph typically has 1,000 or more nodes connected at random. Traversing the playing field between any two location can be done using the A* search algorithm. I wanted to test my algorithm speed against the original game and wrote a C version that would compile in DOSBox using the Borland C++ compile form 1992.

To test, I needed to slow DOSBox down to a crawl. This would make the search take longer, and the goal was to have it take long enough to measure simply by counting seconds. At a cycle speed of 103 everything was slow enough to take several seconds. I would enter a path into a running version of Trade Wars and type the last digit just as the seconds rolled over. I could then just count seconds until the results of the calculation were printed on the screen. I could do the same thing for my software which had to compute the very same route using the same game file. The results:

Trade Wars 2002 v2 wide beta 7 computed a 9-hop path on a 5000 sector universe in 10 seconds. My implementation took 4 seconds after the 2+ minute load to memory, and 42 seconds if loading data from disk as it searched. I can’t prove Trade Wars isn’t doing disk I/O during its search but I think it is unlikely. Best case I actually expected the search times to be about the same so there is a question as to why my search happens faster. One reason might be if Trade Wars dynamically allocates memory during the search. I use a fixed amount of memory, of which is likely never to be used and thus wasteful, but the fixed memory means I don’t have to ask for it during the calculation.

A second test used 12 hop.  It took Trade Wars 15 seconds after the sectors had been loaded and my search took 8 seconds for memory only.

Seeing the search speeds does answer a question I had about some checks Trade Wars does during game creation. There is a search that takes a fair amount of time looking for inaccessible sectors. I thought they might be doing a complete every-sector to every-other-sector path check. But the speeds at which this runs are for too fast for such an operation and something else must be going on.

This has been a fun experiment and proves to me that I have succeeded in implementing a path finding system for a Trade Wars directed graph style universe. That could be useful if I do end up designing a game that uses a similar playing area.

June 26, 2020

Game Server—Path Finding Mistake

There is an issue with my implementation of the path finding algorithm if I want to do a speed comparison with the original Trade Wars 2002. I implemented a bi-directional search in hopes of finding a solution faster as the problem was being attacked from two directions. However, I discovered that some of my paths did not match what the game actually calculated. This is because a Trade Wars universe isn’t fully two-way—it does contain some one-way connections. This means some routes are faster in reverse. Since my algorithm searches from both directions at once, I have a 50% chance of missing the a shortcut should it exist.

What’s funny about this fact is that when I implemented the Javascript search, I thought about this very fact. However, my implementation for the directed graph always used two-directional link. Thus I never had to worry about one-way links.

The only way around this bug is to not search in both directions. This is less efficient, but the only way not to miss one-way links. It turns out that this is a good exercise anyway. The single direction A* code uses much less memory because it doesn’t need to to have direction associated with the search path. I could entirely remove one search structure, and reduce the other to a simple array of integers. Let’s have a look:

//=============================================================================
// Uses: Find a path between two sectors.
// Date: 2020-06-26
// Author: Andrew Que <www.DrQue.net>
// Notes:
//   Uses a bi-directional A* search.
//   Designed for C99 and Borland C++ 3.1 compilers.
//=============================================================================
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "STDINT.H"
#include "COURSE.H"

typedef enum
{
  REVERSE = -1,
  FORWARD = 1
} Direction;

// Static storage for every possible search path.
static PathNode far searchPaths[ MAX_SECTORS ];

typedef struct
{
  PathNode * path;
  Direction direction;
} TreeNode;

// The tree is a set of what sectors have been searched thus far.
static TreeNode far tree[ MAX_SECTORS ];

// The search FIFO is a list of sectors that need to be searched.
// First-In/First-Out ensures the first path found is the shortest.
typedef struct Search
{
  uint16 sector;
  Direction direction;
} Search;

static Search searchFIFO[ MAX_SECTORS ];
static unsigned searchHead = 0;
static unsigned searchTail = 0;

//-----------------------------------------------------------------------------
// Uses:
//   Add a new sector to be searched to the search FIFO.
// Input:
//   currentPath - Path thus far.
//   sector - New sector to search through.
//   direction - The direction this search is traveling.
//-----------------------------------------------------------------------------
static void queueItem
(
  PathNode * currentPath,
  unsigned sector,
  Direction direction
)
{
  // Add this sector to the new path.
  PathNode * newPath = &searchPaths[ sector ];
  newPath->sector    = sector;
  newPath->link      = currentPath;

  // Add an entry into the search tree about this path.
  tree[ sector ].path      = newPath;
  tree[ sector ].direction = direction;

  // Add sector to search FIFO.
  searchHead += 1;
  searchFIFO[ searchHead ].sector = sector;
  searchFIFO[ searchHead ].direction = direction;
}

//-----------------------------------------------------------------------------
// Uses:
//   Compute a course between sectors.
// Input:
//   sectors - List of sectors.
//   start - Sector number from which to start.
//   finish - Sector number to finish.
// Output:
//   Linked-list with the path.  `NULL` of there was no path found.
//-----------------------------------------------------------------------------
PathNode const * course( Sectors const * sectors, unsigned start, unsigned finish )
{
  // Place to hold results of search.
  // `null` means there is currently no path.
  PathNode * path = NULL;

  if ( start != finish )
  {
    // Empty search FIFO.
    searchHead = 0;
    searchTail = 0;

    // Clear search tree.
    memset( &tree, 0, sizeof( tree ) );

    // Start search form both directions.
    queueItem( NULL, start, FORWARD );
    queueItem( NULL, finish, REVERSE );

    // Search for a path, or until the search has exhausted all options.
    while ( ( NULL == path )
         && ( searchTail != searchHead ) )
    {
      unsigned sector;
      Direction direction;
      unsigned connectionIndex;
      Interconnects interconnects;

      // Path to search.
      Search const * item = &searchFIFO[ searchTail ];
      searchTail += 1;

      sector    = item->sector;
      direction = item->direction;

      getInterconnects( sectors, interconnects, sector );

      // For all the connections
      for ( connectionIndex = 0; connectionIndex < INTERCONNECTS; connectionIndex += 1 )
      {
        unsigned connection = interconnects[ connectionIndex ];

        if ( ( 0 != connection )
          && ( ! path ) )
        {
          // Is this connection already in the search tree?
          if ( 0 != tree[ connection ].path  )
          {
            TreeNode * branch = &tree[ connection ];
            if ( branch->direction == -direction )
            {
              PathNode * previous;
              PathNode * current;
              PathNode * following;

              // Join two paths together, mindful of which direction we're traversing.
              PathNode * firstSegment;
              PathNode * secondSegment;
              if ( FORWARD == direction )
              {
                firstSegment  = tree[ sector ].path;
                secondSegment = branch->path;
              }
              else
              {
                firstSegment  = branch->path;
                secondSegment = tree[ sector ].path;
              }

              // Reverse first first segment of path.
              previous  = NULL;
              current   = firstSegment;
              following = current;
              while ( current )
              {
                following = following->link;
                current->link = previous;
                previous = current;
                current = following;
              }

              path = previous;

              // Join remaining path.
              firstSegment->link = secondSegment;
            }
          }
          else
            queueItem( tree[ sector ].path, connection, direction );
        }
      }

    }
  }

  return path;
}

//-----------------------------------------------------------------------------
// Uses:
//   Print a course.
// Input:
//   path - Path found through `course` function.
//-----------------------------------------------------------------------------
void printCourse( PathNode const * path )
{
  while ( path )
  {
    char separator = '>';
    if ( ! path->link )
      separator = '\n';

    printf( " %u %c", path->sector, separator );

    path = path->link;
  }
}