I have messed with minicom but found that it limited certain charcters. I wanted to capture all characters including PCL from a printer port.
Github provided a version of serial_to_tcp.py that receives the serial port data and redirects it to a specific TCP port such as an Ethernet printer.
https://github.com/jabapyth/pydbgp/blob ... edirect.pyThis was the "Holy Grail" for what I needed to do. I modified it to write the same data to a text file and to ignore the Ethernet redirect if there is no connection.
My raspberry pi, with the addition of a serial RS232 DB9 level shifter is now a fantastic device server with serial text capture ability. I have also replaced older serial printers with any printer that uses the raw telnet port 9100 protocol. It is reliable and captures everything.
Below is my version of the Python code. Pyserial is needed which can be easily installed. Just modify the logfile path and set the arguments for the Ethernet path and port:
EX: ./serial_to_tcp.py -H 192.168.0.61 -P 9100
I want to share this because of the help and support of others that made this possible. Have fun!
- Code: Select all
#!/usr/bin/env python
# based on tcp_serial_redirect.py from pyserial
# http://pyserial.sourceforge.net/
""" Added logging and ping feature. The additions write the data stream to a text file
and tests the communication to the printer port. If communication is lost, the data transfer to
TCP is suspended. The communication test interval is 1 minute when the serial port is idle.
USAGE: serial_tcp_redirect.py [options]
Simple Serial to Network (TCP/IP) redirector.
This redirector waits for data from the serial port, then connects
to the remote host/port and relays data back and forth. If either
the COM port or the socket is closed, it restarts and waits for a
new connection on the COM port.
Options:
-p, --port=PORT serial port, a number, defualt = 0 or a device name
-b, --baud=BAUD baudrate, default 9600
-r, --rtscts enable RTS/CTS flow control (default off)
-x, --xonxoff enable software flow control (default off)
-H, --iphost Remote TCP/IP host (default 127.0.0.1)
-P, --ipport Remote TCP/IP port (default 9000)
Note: Only one connection at once is supported. If the connection is terminaed
it waits for the next connect.
"""
import sys, os, threading, getopt, socket, subprocess, datetime
try:
import serial
except:
print "Running serial_tcp_redirect requries pyserial"
print "available at http://pyserial.sourceforge.net/"
sys.exit(1)
try:
True
except NameError:
True = 1
False = 0
class pingCommunication():
def __init__(self, ipToPing):
self.ipToPing = ipToPing
self.pingQuantity = "1"
def pingProcess(self):
pingTest = "ping -c "+ self.pingQuantity + ' ' + self.ipToPing
#print pingTest -DL- Unremark for testing
process = subprocess.Popen(pingTest, shell=True, stdout=subprocess.PIPE)
process.wait()
returnCodeTotal = process.returncode
return returnCodeTotal
class SerialRedirector:
def __init__(self, ping_addr, tcp_addr, com_port, baudrate=9600, rtscts=False,
xonxoff=False, timeout=60):
# print "PRINTER IP ADDRESS IS %r" % (ping_addr)
self.ping = ping_addr
self.addr = tcp_addr
self.socket = None
self.thread_write = None
self.alive = False
# create the serial connection
ser = serial.Serial()
ser.port = com_port
ser.baudrate = baudrate
ser.rtscts = rtscts
ser.xonxoff = xonxoff
ser.timeout = timeout #required so that the reader thread can exit
try:
ser.open()
except serial.SerialException, e:
print "Could not open serial port %s: %s" % (ser.portstr, e)
sys.exit(1)
self.serial = ser
def go(self):
"""wait for incoming com data, then redirect
to the tcp port"""
self.alive = True
self.reader()
def socketStart(self):
print "CONNECTING TO IDE AT %r" % self.addr
# create the socket connection
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.addr[0], self.addr[1]))
except socket.error, details:
self.socket = None
raise
# start redirecting from serial to tcpip
self.thread_write = threading.Thread(target=self.writer)
self.thread_write.setDaemon(1)
self.thread_write.start()
def reader(self):
"""loop forever and copy serial->socket"""
print "Serial To TCP Application Started Using Port %s %s\n\nTCP/Printer IP Adress: %s\n\n" % (self.serial.portstr, str(datetime.datetime.now()),self.ping)
data = None
comOK = 2
data_on = 2
while not data:
# print comOK # -DL- remove remark for testing
comunicate = pingCommunication(self.ping)
ignoreTCP = comunicate.pingProcess()
if ignoreTCP == 0:
if not comOK == 1:
print "PRINTER CONNECTED %s %s\n\n" %(self.addr, str(datetime.datetime.now()))
comOK = 1
else:
if not comOK == 0:
print "PRINTER DISCONNECTED %s %s\n\n" %(self.addr, str(datetime.datetime.now()))
comOK = 0
data = self.serial.read(1) #read one, blocking
n = self.serial.inWaiting() #look if there is more
sys.stdout.flush()
# print "Received serial data on %s" % self.serial.portstr
# print "data [%r]" %data -DL- ????
while self.alive:
try:
if n:
data = data + self.serial.read(n) #and get as much as possible
if data:
logfile = open("/var/www/active_log.txt","a")
logfile.write("%s"%data)
#print ("%s" %data, end="")-DL- Removed for final testing
logfile.close()
if not self.socket and ignoreTCP == 0:
self.socketStart()
if comOK == 1:
self.socket.sendall(data) #send it over TCP
if not data_on == 1:
print "DATA RECEIVED " + str(datetime.datetime.now())
data_on = 1
sys.stdout.flush()
else:
comunicate = pingCommunication(self.ping)
ignoreTCP = comunicate.pingProcess()
if ignoreTCP == 0:
if not comOK == 1:
print "PRINTER CONNECTED %s %s\n\n" %(self.addr, str(datetime.datetime.now()))
comOK = 1
else:
if not comOK == 0:
print "PRINTER DISCONNECTED %s %s\n\n" %(self.addr, str(datetime.datetime.now()))
comOK = 0
if data_on == 1:
print "SERIAL PORT IDLE %s\n\n" % str(datetime.datetime.now())
data_on = 0
data = self.serial.read(1) #read one, blocking
n = self.serial.inWaiting() #look if there is more
sys.stdout.flush()
except socket.error, msg:
print msg
#probably got disconnected
break
self.alive = False
self.serial.close()
if self.thread_write:
self.thread_write.join()
def writer(self):
"""loop forever and copy socket->serial"""
while self.alive:
try:
data = self.socket.recv(1024)
if not data:
break
#print "socket data [%r]" % data -DL- removed for final testing
self.serial.write(data) #get a bunch of bytes and send them
except socket.error, msg:
print msg
#probably got disconnected
break
self.alive = False
# close the socket
try:
self.socket.close()
except socket.error, details:
pass #we quiting, dont care about the error
self.socket = None
def stop(self):
"""Stop copying"""
if self.alive:
self.alive = False
self.thread_write.join()
if __name__ == '__main__':
#parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:],
"hp:b:rxP:H:",
["help", "port=", "baud=", "rtscts", "xonxoff", "ipport=", "iphost="])
except getopt.GetoptError:
# print help information and exit:
print >>sys.stderr, __doc__
sys.exit(2)
ser_port = '/dev/ttyAMA0'
baudrate = 9600
rtscts = False
xonxoff = False
iphost = '192.168.0.127'
ipport = 9100
for o, a in opts:
if o in ("-h", "--help"): #help text
usage()
sys.exit()
elif o in ("-p", "--port"): #specified port
try:
ser_port = int(a)
except ValueError:
ser_port = a
elif o in ("-b", "--baud"): #specified baudrate
try:
baudrate = int(a)
except ValueError:
raise ValueError, "Baudrate must be a integer number"
elif o in ("-r", "--rtscts"):
rtscts = True
elif o in ("-x", "--xonxoff"):
xonxoff = True
elif o in ("-H", "--iphost"):
iphost = a
elif o in ("-P", "--ipport"):
try:
ipport = int(a)
except ValueError:
raise ValueError, "Local port must be an integer number"
print "\n---Serial to TCP/IP redirector ---\nVersion 0 Jan-10-2013 By -DL-\n\n"
while 1:
try:
#enter console->serial loop
r = SerialRedirector(iphost,[iphost, ipport],
ser_port, baudrate, rtscts, xonxoff)
r.go()
except socket.error, msg:
print msg
print "\n--- exit ---"