Chris Lacy's Software Engineering Blog

Friday Dec 23, 2011

Linux VPN Restart Python Script

Usage: ./vpnRestart <interface id> <vpn id>
For example: ./vpnRestart eth0 myVpn
Tested on Fedora and Ubuntu

#!/usr/bin/env python


import logging
import sys, getopt
import time
import gobject, dbus
from dbus.mainloop.glib import DBusGMainLoop



class DbusHelper:


    basePackage = None
    systemBus = None


    def __init__(self, basePackage):

        self.basePackage = basePackage
        self.systemBus = dbus.SystemBus()


    def createProxy(self, path):

        return self.systemBus.get_object(self.basePackage, path)


    def createInterface(self, proxy, interfacePackage):

        return dbus.Interface(proxy, dbus_interface=interfacePackage)


    def createIface(self, proxyPath, interfacePackage):

        return self.createInterface(self.createProxy(proxyPath), interfacePackage)


    def addReceiver(self, handler, interface, signalName):

        self.systemBus.add_signal_receiver(handler,
	    dbus_interface=interface,
	    signal_name=signalName)



class DbusPropertiesHelper:


    interfacePath = None
    propertiesI = None


    def __init__(self, proxyPath, dbusHelper, interfacePath):

        self.interfacePath = interfacePath
        self.propertiesI = dbusHelper.createIface(proxyPath, 'org.freedesktop.DBus.Properties')


    def get(self, prop):

        return self.propertiesI.Get(self.interfacePath, prop)



class NetworkManagerHelper:


    FAILED = 6
    DISCONNECTED = 7


    interfaceQuery = None
    vpnQuery = None


    dbusHelper = None
    networkManagerI = None
    settingsI = None


    def __init__(self, interfaceQuery, vpnQuery):

        self.interfaceQuery = interfaceQuery
        self.vpnQuery = vpnQuery

        DBusGMainLoop(set_as_default=True)
        self.dbusHelper = DbusHelper('org.freedesktop.NetworkManager')
        self.networkManagerI = self.dbusHelper.createIface('/org/freedesktop/NetworkManager', 'org.freedesktop.NetworkManager')
        self.settingsI = self.dbusHelper.createIface('/org/freedesktop/NetworkManager/Settings', 'org.freedesktop.NetworkManager.Settings')


    def findConnection(self, query):

        connection = None

        for c in self.getConnections():

            ci = self.dbusHelper.createIface(c, 'org.freedesktop.NetworkManager.Settings.Connection')
            settings = ci.GetSettings()
            
            if settings['connection']['id'].find(query) != -1:
                connection = c
                break

        return connection


    def getConnections(self):

        return self.settingsI.ListConnections()


    def getConnectionPath(self, uuid):

        connection = None

        for c in self.getConnections():

            settings = settingsI.GetSettings()

            if settings['connection']['uuid'] == uuid:
                connection = c
                break

        return connection


    def getActiveConnection(self, path):

        connection = None
        nmdph = DbusPropertiesHelper('/org/freedesktop/NetworkManager', self.dbusHelper, 'org.freedesktop.NetworkManager')

        for a in nmdph.get('ActiveConnections'):

            acdph = DbusPropertiesHelper(a, self.dbusHelper, 'org.freedesktop.NetworkManager.Connection.Active')

            if acdph.get('Connection') == path:
                connection = a
                break

        return connection


    def activateVpn(self, connection, specific_object):

        self.networkManagerI.ActivateConnection(connection, "/", specific_object)


    def isConnectionActive(self, connectionPath):

        c = self.getActiveConnection(connectionPath)

        return c != None


    def activateVpnIf(self):

        baseConnectionPath = self.findConnection(self.interfaceQuery)
        baseConnected = self.isConnectionActive(baseConnectionPath)
        
        while (baseConnected is False):
            logging.debug("waiting for base connection")
            time.sleep(10)
            baseConnected = self.isConnectionActive(baseConnectionPath)

        baseActiveConnection = self.getActiveConnection(baseConnectionPath)
        vpnConnectionPath = self.findConnection(self.vpnQuery)
        vpnConnected = self.isConnectionActive(vpnConnectionPath)

        if (vpnConnected is False):

            self.activateVpn(vpnConnectionPath, baseActiveConnection)
            connected = self.isConnectionActive(vpnConnectionPath)


    def startVpnMonitor(self):

        self.activateVpnIf()
        self.dbusHelper.addReceiver(self.vpnConnectionHandler, "org.freedesktop.NetworkManager.VPN.Connection", "VpnStateChanged")
        gobject.MainLoop().run()


    def vpnConnectionHandler(self, *args, **kwargs):

        state = args[0]
        logging.debug("state %s", state)

        if ((state == NetworkManagerHelper.FAILED) or (state == NetworkManagerHelper.DISCONNECTED)):
        
            self.activateVpnIf()



if __name__ == "__main__":

    logging.basicConfig(level=logging.ERROR)

    if(len(sys.argv) != 3):

        print "Usage: vpnRestart <interface id> <vpn id>"
        sys.exit(1)

    interfaceQuery = sys.argv[1]
    vpnQuery = sys.argv[2]

    networkManagerHelper = NetworkManagerHelper(interfaceQuery, vpnQuery)
    baseConnectionPath = networkManagerHelper.findConnection(interfaceQuery)
    logging.debug(baseConnectionPath)

    if(baseConnectionPath is None):

        print "Interface id " + interfaceQuery + " not found."
        sys.exit(1)

    vpnConnectionPath = networkManagerHelper.findConnection(vpnQuery)
    logging.debug(vpnConnectionPath)

    if(vpnConnectionPath is None):

        print "Vpn id " + vpnQuery + " not found."
        sys.exit(1)

    networkManagerHelper.startVpnMonitor()

Calendar

Feeds

Navigation