2022-06-03 17:33:58 +02:00
#!/usr/bin/env python3
""" @package docstring
2022-06-04 09:11:59 +02:00
Manchester decoder
2022-06-03 17:33:58 +02:00
2022-06-04 09:11:59 +02:00
Decodes a file encoded with the manchester code
2022-06-03 17:33:58 +02:00
See https : / / www . youtube . com / watch ? v = 8 BhjXqw9MqI & list = PLowKtXNTBypH19whXTVoG3oKSuOcw_XeW & index = 3
and following videos by awesome Ben Eater
Very slow and inefficient implementation , meant only to be clear and didactic
@author Daniele Verducci < daniele . verducci @ichibi.eu >
This program is free software : you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation , either version 3 of the License , or
( at your option ) any later version .
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
You should have received a copy of the GNU General Public License
along with this program . If not , see < http : / / www . gnu . org / licenses / > .
"""
import os
import sys
import logging
import wave
import struct
NAME = ' manchester-decoder '
VERSION = ' 0.1 '
DESCRIPTION = ' Decodes a file using the manchester encoding '
2022-06-04 09:11:59 +02:00
FRAME_DELIMITER = 126 # (01111110)
2022-06-03 17:33:58 +02:00
PREAMBLE_DURATION = 128
ZERO_POINT = 0 # The 0 value: values less than this are considered 0, more than this 1
class Main :
def __init__ ( self ) :
self . _log = logging . getLogger ( ' main ' )
self . clockDuration = 0
def run ( self , inputFile , outputFile ) :
# Open input audio file
self . audioSource = wave . open ( inputFile , ' r ' )
try :
self . syncWithClock ( )
self . _log . info ( " Sync: clock duration is {} " . format ( self . clockDuration ) )
2022-06-04 09:11:59 +02:00
self . waitForStart ( )
2022-06-03 17:33:58 +02:00
#self._log.info("Found start of data")
except ValueError as e :
self . _log . error ( " Ran out of input data before completing initialization! " )
#self.decodeFile()
self . audioSource . close ( )
def syncWithClock ( self ) :
# Uses the preamble to obtain the clock duration
2022-06-04 09:11:59 +02:00
analyzedCycles = 0
2022-06-03 17:33:58 +02:00
while True :
2022-06-04 09:11:59 +02:00
( cycles , raising ) = self . goToNextZeroCrossing ( True )
analyzedCycles = analyzedCycles + cycles
if analyzedCycles > PREAMBLE_DURATION * self . clockDuration / 4 :
# At this point we should have an idea of the clock duration, move on
return
2022-06-03 17:33:58 +02:00
def waitForStart ( self ) :
# After the clock has been extimated, continue reading and wait for first delimiter
lastByte = 0
while True :
2022-06-04 09:11:59 +02:00
value = self . decodeBit ( )
2022-06-03 17:33:58 +02:00
2022-06-04 09:11:59 +02:00
# Shift the byte to left
2022-06-03 17:33:58 +02:00
lastByte = lastByte << 1
2022-06-04 09:11:59 +02:00
# Truncate the length to 8 bits
lastByte = lastByte & 255 # 0 11111111
# Add the read bit in the least significant position
2022-06-03 17:33:58 +02:00
if value :
lastByte = lastByte + 1
if lastByte == FRAME_DELIMITER :
# Found frame delimiter! We are in sync! YAY!
2022-06-04 09:11:59 +02:00
self . _log . info ( " Found first frame delimiter " )
2022-06-03 17:33:58 +02:00
return
2022-06-04 09:11:59 +02:00
2022-06-03 17:33:58 +02:00
#def decodeFile():
# From the bit after the FRAME_DELIMITER on, there is the real data. Decode and write to file
#try:
#except ValueError as e:
# Completed reading file
2022-06-04 09:11:59 +02:00
def decodeBit ( self ) :
# Decodes a bit. Searches for the phase invertion at 75% to 125% of the clock cycle
bitDuration = 0
2022-06-03 17:33:58 +02:00
while True :
2022-06-04 09:11:59 +02:00
( duration , raising ) = self . goToNextZeroCrossing ( False )
bitDuration = bitDuration + duration
if bitDuration < self . clockDuration * 0.75 :
# Ignore: half-cycle crossing due to two equal digits one near the other
continue
if bitDuration > self . clockDuration * 1.25 :
# Lost tracking!
raise Exception ( " Lost tracking! No phase inversion found between {} and {} samples from the last one " . format ( self . clockDuration * 0.75 , self . clockDuration * 1.25 ) )
2022-06-03 17:33:58 +02:00
2022-06-04 09:11:59 +02:00
# This is our phase inversion signal
return raising
2022-06-03 17:33:58 +02:00
# Reads and decodes 2 raw bits into 1 decoded bit. 01 (raising) = 1, 10 (falling) = 0
# Works only once clock is synced
# The bits are read at 1/4 and 3/4 of clock cycle
firstHalfFrame = self . audioSource . readframes ( ( self . clockDuration / 2 ) - 1 )
if not firstHalfFrame :
raise ValueError ( ' No more data to read ' )
secondHalfFrame = self . audioSource . readframes ( 1 )
if not secondHalfFrame :
raise ValueError ( ' No more data to read ' )
firstHalfRawBit = int ( struct . unpack ( ' <h ' , firstHalfFrame ) [ 0 ] ) > ZERO_POINT
secondHalfRawBit = int ( struct . unpack ( ' <h ' , secondHalfFrame ) [ 0 ] ) > ZERO_POINT
if not firstHalfRawBit and secondHalfRawBit :
return True
elif firstHalfRawBit and not secondHalfRawBit :
return True
else :
# Lost sync!!!
raise Exception ( )
2022-06-04 09:11:59 +02:00
def goToNextZeroCrossing ( self , adjustClockDuration ) :
# Find the next zero crossing and returns:
# (cycles since last inversion, True if is raising, False if is falling)
cyclesSinceLastInversion = 0
prev = None
while True :
frame = self . audioSource . readframes ( 1 )
if not frame :
raise ValueError ( ' No more data to read ' )
v = int ( struct . unpack ( ' <h ' , frame ) [ 0 ] ) > ZERO_POINT
if prev == None :
prev = v
if v != prev :
# Zero-point crossing!
if adjustClockDuration :
self . clockDuration = ( self . clockDuration + cyclesSinceLastInversion ) / 2
return ( cyclesSinceLastInversion , v )
2022-06-03 17:33:58 +02:00
2022-06-04 09:11:59 +02:00
cyclesSinceLastInversion = cyclesSinceLastInversion + 1
2022-06-03 17:33:58 +02:00
if __name__ == ' __main__ ' :
import argparse
parser = argparse . ArgumentParser (
prog = NAME + ' .py ' ,
description = NAME + ' ' + VERSION + ' \n ' + DESCRIPTION ,
formatter_class = argparse . RawTextHelpFormatter
)
parser . add_argument ( ' inputFile ' , help = " audio file to decode (in wav format) " )
parser . add_argument ( ' outputFile ' , help = " decoded file to write " )
parser . add_argument ( ' -v ' , ' --verbose ' , action = ' store_true ' , help = " verbose output " )
args = parser . parse_args ( )
if args . verbose :
logging . basicConfig ( level = logging . INFO )
else :
logging . basicConfig ( )
main = Main ( )
main . run ( args . inputFile , args . outputFile )
sys . exit ( 0 )