Chris Lacy's Software Engineering Blog
Friday Dec 23, 2011
Linux VPN Restart Python Script
Usage: ./vpnRestart <interface id> <vpn id>
For example: ./vpnRestart eth0 myVpn
Tested on Fedora and Ubuntu
#!/usr/bin/env python
import logging
import sys, getopt
import time
import gobject, dbus
from dbus.mainloop.glib import DBusGMainLoop
class DbusHelper:
basePackage = None
systemBus = None
def __init__(self, basePackage):
self.basePackage = basePackage
self.systemBus = dbus.SystemBus()
def createProxy(self, path):
return self.systemBus.get_object(self.basePackage, path)
def createInterface(self, proxy, interfacePackage):
return dbus.Interface(proxy, dbus_interface=interfacePackage)
def createIface(self, proxyPath, interfacePackage):
return self.createInterface(self.createProxy(proxyPath), interfacePackage)
def addReceiver(self, handler, interface, signalName):
self.systemBus.add_signal_receiver(handler,
dbus_interface=interface,
signal_name=signalName)
class DbusPropertiesHelper:
interfacePath = None
propertiesI = None
def __init__(self, proxyPath, dbusHelper, interfacePath):
self.interfacePath = interfacePath
self.propertiesI = dbusHelper.createIface(proxyPath, 'org.freedesktop.DBus.Properties')
def get(self, prop):
return self.propertiesI.Get(self.interfacePath, prop)
class NetworkManagerHelper:
FAILED = 6
DISCONNECTED = 7
interfaceQuery = None
vpnQuery = None
dbusHelper = None
networkManagerI = None
settingsI = None
def __init__(self, interfaceQuery, vpnQuery):
self.interfaceQuery = interfaceQuery
self.vpnQuery = vpnQuery
DBusGMainLoop(set_as_default=True)
self.dbusHelper = DbusHelper('org.freedesktop.NetworkManager')
self.networkManagerI = self.dbusHelper.createIface('/org/freedesktop/NetworkManager', 'org.freedesktop.NetworkManager')
self.settingsI = self.dbusHelper.createIface('/org/freedesktop/NetworkManager/Settings', 'org.freedesktop.NetworkManager.Settings')
def findConnection(self, query):
connection = None
for c in self.getConnections():
ci = self.dbusHelper.createIface(c, 'org.freedesktop.NetworkManager.Settings.Connection')
settings = ci.GetSettings()
if settings['connection']['id'].find(query) != -1:
connection = c
break
return connection
def getConnections(self):
return self.settingsI.ListConnections()
def getConnectionPath(self, uuid):
connection = None
for c in self.getConnections():
settings = settingsI.GetSettings()
if settings['connection']['uuid'] == uuid:
connection = c
break
return connection
def getActiveConnection(self, path):
connection = None
nmdph = DbusPropertiesHelper('/org/freedesktop/NetworkManager', self.dbusHelper, 'org.freedesktop.NetworkManager')
for a in nmdph.get('ActiveConnections'):
acdph = DbusPropertiesHelper(a, self.dbusHelper, 'org.freedesktop.NetworkManager.Connection.Active')
if acdph.get('Connection') == path:
connection = a
break
return connection
def activateVpn(self, connection, specific_object):
self.networkManagerI.ActivateConnection(connection, "/", specific_object)
def isConnectionActive(self, connectionPath):
c = self.getActiveConnection(connectionPath)
return c != None
def activateVpnIf(self):
baseConnectionPath = self.findConnection(self.interfaceQuery)
baseConnected = self.isConnectionActive(baseConnectionPath)
while (baseConnected is False):
logging.debug("waiting for base connection")
time.sleep(10)
baseConnected = self.isConnectionActive(baseConnectionPath)
baseActiveConnection = self.getActiveConnection(baseConnectionPath)
vpnConnectionPath = self.findConnection(self.vpnQuery)
vpnConnected = self.isConnectionActive(vpnConnectionPath)
if (vpnConnected is False):
self.activateVpn(vpnConnectionPath, baseActiveConnection)
connected = self.isConnectionActive(vpnConnectionPath)
def startVpnMonitor(self):
self.activateVpnIf()
self.dbusHelper.addReceiver(self.vpnConnectionHandler, "org.freedesktop.NetworkManager.VPN.Connection", "VpnStateChanged")
gobject.MainLoop().run()
def vpnConnectionHandler(self, *args, **kwargs):
state = args[0]
logging.debug("state %s", state)
if ((state == NetworkManagerHelper.FAILED) or (state == NetworkManagerHelper.DISCONNECTED)):
self.activateVpnIf()
if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR)
if(len(sys.argv) != 3):
print "Usage: vpnRestart <interface id> <vpn id>"
sys.exit(1)
interfaceQuery = sys.argv[1]
vpnQuery = sys.argv[2]
networkManagerHelper = NetworkManagerHelper(interfaceQuery, vpnQuery)
baseConnectionPath = networkManagerHelper.findConnection(interfaceQuery)
logging.debug(baseConnectionPath)
if(baseConnectionPath is None):
print "Interface id " + interfaceQuery + " not found."
sys.exit(1)
vpnConnectionPath = networkManagerHelper.findConnection(vpnQuery)
logging.debug(vpnConnectionPath)
if(vpnConnectionPath is None):
print "Vpn id " + vpnQuery + " not found."
sys.exit(1)
networkManagerHelper.startVpnMonitor()
Posted at 01:02AM Dec 23, 2011 by chris in General |