2

Let me just start with this... I don't know Python at all; I am going in circles and I simply don't get it. I am completely open to alternative and easier methods.

My goal: connect to different servers, run the same command on each, and later (as in not now/yet) use the output for productive things. Awesome.

What I have: found some code somewhere (I'll try and find a link and update this). I modified it a little. It connects to different servers, runs same command.

Problem: I don't know how to stop the reactor once everything is done. And I really want to stop it without pressing cntrl+c. I think I need to defer something but I have no idea what or where. I feel like the when the SSHChannel closes, that needs to somehow bubble up to SSHConnection, to stop the service... so the transport can know what's up? And I keep wanting to somehow wrap each reactor.connectTCP(server, 22, factory) in a deferred somehow. And I feel like I maybe need a controller class. I tried these things but I did not try them correctly. And maybe gatherResults might help but, again, I don't know what to put it on exactly.

from twisted.conch.ssh import transport, connection, userauth, channel, common
from twisted.internet import defer, protocol, reactor
import sys, struct  

USER = 'username'
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD  = 'echo "merely this and nothing more"'


from twisted.python import log
import sys
log.startLogging(sys.stdout)


class ClientCommandTransport(transport.SSHClientTransport):
    def __init__(self, username, password, command):
        self.username = username
        self.password = password
        self.command  = command

    def verifyHostKey(self, pubKey, fingerprint):
        print fingerprint 
        return defer.succeed(True)

    def connectionSecure(self):
        self.requestService(
            PasswordAuth(self.username, self.password,
                         ClientConnection(self.command)))    

class PasswordAuth(userauth.SSHUserAuthClient):
    def __init__(self, user, password, connection):
        userauth.SSHUserAuthClient.__init__(self, user, connection)
        self.password = password

    def getPassword(self, prompt=None):
        return defer.succeed(self.password)

class ClientConnection(connection.SSHConnection):
    def __init__(self, cmd, *args, **kwargs):
        connection.SSHConnection.__init__(self)
        self.command = cmd

    def serviceStarted(self):
        self.openChannel(CommandChannel(self.command, conn=self))  

class CommandChannel(channel.SSHChannel):
    name = 'session'

    def __init__(self, command, *args, **kwargs):
        channel.SSHChannel.__init__(self, *args, **kwargs)
        self.command = command
        self.data = ''

    def channelOpen(self, data):
        self.conn.sendRequest(
            self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                                                            self._gotResponse)

    def _gotResponse(self, _):
        self.conn.sendEOF(self) 
        self.loseConnection() 

    def dataReceived(self, data):
        #self.data += data
        print data 

    def request_exit_status(self, data):
        (status,) = struct.unpack('>L', data)
        # print 'exit status = ', status  

class ClientCommandFactory(protocol.ClientFactory):
    def __init__(self, command=CMD):
        self.username = USER
        self.password = PASS
        self.command  = command

    def buildProtocol(self, addr):
        protocol = ClientCommandTransport(
            self.username, self.password, self.command)
        return protocol    


masters = ['server1','server2','server3','server4','server5']

factory = ClientCommandFactory()

for server in masters:
    print server
    reactor.connectTCP(server, 22, factory)

reactor.run()

I did play with deferring getPage for an http request (which did work) but I can't seem to reapply it with reactors and ssh connections.

These are the resources I really wish that I could make sense of:


With the one answer below... I tested out passing down a reference to the factory and ended up stopping the rector in SSHChannel closed() if the factory didn't have anymore connections in its array (or whatever python calls arrays).

I updated the factory to now also include this method:

class ClientCommandFactory(protocol.ClientFactory): 

    def clientConnectionLost(self, connector, reason):
        print reason

I took a look at logging because I'm generally interested in what is happening and... (some of these are my own statements, some are default)

014-10-16 13:42:58-0500 [SSHChannel session (0) on SSHService ssh-connection on ClientCommandTransport,client] closed last TCP connection
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] service stopped 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ]
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ]
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] Stopping factory <__main__.ClientCommandFactory instance at 0x02323030>
2014-10-16 13:42:58-0500 [-] Main loop terminated.

So... it says the connection was lost in an unclean way. Is there a better way I should be stopping things..?

Community
  • 1
  • 1
gloomy.penguin
  • 5,833
  • 6
  • 33
  • 59
  • I wrote this question about a week ago... decided to abandon it for paramiko. Just realized I need to run multiple commands with the same connection (and `&&` concatenation isn't enough). I still really don't know what to do with this. ultimately, i would like to handle the disconnect based on a command's response but none of that matters yet. – gloomy.penguin Oct 15 '14 at 15:48
  • Did you check [this](http://stackoverflow.com/questions/13920962/what-is-the-correct-way-to-close-a-twisted-conch-ssh-connection?lq=1)? It might be that you would have to implement something yourself to take care of closing the connections gracefully. Anyway, that is not related to stopping the reactor after the work is done. – koleS Oct 18 '14 at 12:15
  • @koleS - thanks, I didn't see that. I tried applying the stuff in the answer someone gave me below but the reactor occasionally stops too soon because not all connections have been added after the first one or two finish. There are just so many things going on everywhere, I'm not sure where or how to capture or manage things. – gloomy.penguin Oct 18 '14 at 18:16

1 Answers1

0

So first of all this isn't going to work, because connectTCP accepts a string with an IP address as a first argument and you are passing elements from this list:

masters = ['server1','server2','server3','server4','server5']

Stopping the reactor after all tasks are completed is pretty common use case of twisted. One way to do it would be to store a counter of tasks to execute in the factory. Every time an instance of this factory's protocol is instantiated, increase that number by one, every time the protocol instance (task) returns a result, decrease the counter, stop the reactor when the counter reaches 0. Sample code:

from twisted.conch.ssh import transport, connection, userauth, channel, common
from twisted.internet import defer, protocol, reactor
import sys, struct  

USER = 'username'
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD  = 'echo "merely this and nothing more"'


from twisted.python import log
import sys
log.startLogging(sys.stdout)


class ClientCommandTransport(transport.SSHClientTransport):
    def __init__(self, username, password, command, factory):
        self.username = username
        self.password = password
        self.command  = command
        self.factory = factory

    def verifyHostKey(self, pubKey, fingerprint):
        print fingerprint 
        return defer.succeed(True)

    def connectionSecure(self):
        self.requestService(
            PasswordAuth(self.username, self.password,
                         ClientConnection(self.command, self.factory)))

class PasswordAuth(userauth.SSHUserAuthClient):
    def __init__(self, user, password, connection):
        userauth.SSHUserAuthClient.__init__(self, user, connection)
        self.password = password

    def getPassword(self, prompt=None):
        return defer.succeed(self.password)

class ClientConnection(connection.SSHConnection):
    def __init__(self, cmd, *args, **kwargs):
        connection.SSHConnection.__init__(self)
        self.command = cmd
        self.factory = factory

    def serviceStarted(self):
        self.openChannel(CommandChannel(self.command, self.factory, conn=self))

class CommandChannel(channel.SSHChannel):
    name = 'session'

    def __init__(self, command, factory, *args, **kwargs):
        channel.SSHChannel.__init__(self, *args, **kwargs)
        self.command = command
        self.data = ''
        self.factory = factory
        self.factory.num_connections += 1
        self.factory.connections.append(self)

    def channelOpen(self, data):
        self.conn.sendRequest(
            self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                                                            self._gotResponse)

    def _gotResponse(self, _):
        self.conn.sendEOF(self) 
        self.loseConnection()
        self.factory.num_connections -= 1
        self.factory.connections.remove(self)
        if self.factory.num_connections == 0:
            reactor.stop()

    def dataReceived(self, data):
        #self.data += data
        print data 

    def request_exit_status(self, data):
        (status,) = struct.unpack('>L', data)
        # print 'exit status = ', status  

class ClientCommandFactory(protocol.ClientFactory):
    def __init__(self, command=CMD):
        self.username = USER
        self.password = PASS
        self.command  = command
        self.connections = []
        self.num_connections = 0

    def buildProtocol(self, addr):
        protocol = ClientCommandTransport(
            self.username, self.password, self.command, self)
        return protocol    


masters = ['server1','server2','server3','server4','server5']

factory = ClientCommandFactory()

for server in masters:
    print server
    reactor.connectTCP(server, 22, factory)

reactor.run()

What I did here is I added two variables to the factory self.connections and self.num_connections to store references to the connections in the factory and to count the number of connections. Then in the factory's buildProtocol the factory passes itself to the ClientCommandTransport, which in turn passes the reference to the factory to ClientConnection, which finally passes the reference to the factory, where it is needed - to CommandChannel. Every time an instance of a CommandChannel is instantiated, it has a reference to the factory, so it increases the number of connections by one and adds itself to the list of connection, which is stored in the factory. I assumed that _gotResponse callback is fired, when a task/command is done. So whenever it is fired, it loses the connection as before, but now, additionally, it decreases the connection counter and removes reference to itself from the factory. It also checks if there are any other open connections, if there are not it stops the reactor.

I have not tested this code, but it is a common pattern in Twisted, that a factory keeps a list of references to protocol instances it created, so that every instance has access to other instances via the factory and is able to stop the reactor once all instance have done their work.

Note that this hierarchy is also somewhat deep, Factory -> ClientCommandTransport -> ClientConnection -> CommandChannel and I am not sure if it was the optimal solution to pass the reference to the factory all way down.

One of the variables is actually redundant - you could store either only self.num_connections and increase/decrease it or self.connections, add/remove instance from the list and use len(self.connections) to see if there are still any open connections.

koleS
  • 1,263
  • 6
  • 30
  • 46
  • oh... so what I posted does work, actually. i just can't stop the reactor once i want to complete things. I don't have time to go through your whole response right now but I'll be back in a little bit. – gloomy.penguin Oct 15 '14 at 18:41
  • your answer does look pretty good, though... kinda what I was thinking but I just couldn't do it for some reason. instead of "bubbling up" the disconnection, I guess I needed to pass things downward. You don't need to test the code for me as long as you're sure on the idea you presented. – gloomy.penguin Oct 15 '14 at 18:47
  • If you think my answer `does look pretty good` then you could accept it. I don't understand what you mean by passing things downward (what things, downward to what point, etc.). I think that the number of connections has to be stored in the factory, be all other classes are instantiated once per invocation of `buildProtocol`, so the factory is the only one entity that can keep track of number of connections. [Here](http://krondo.com/blog/?p=1209) you can find a great twisted tutorial. One of its parts, namely [this one](http://bit.ly/1Dd2wWk) presents the idea of counting tasks. – koleS Oct 15 '14 at 20:00
  • i'm gonna wait... bc `I am not sure if it was the optimal solution to pass the reference to the factory all way down.` – gloomy.penguin Oct 15 '14 at 20:02
  • You can always store the references in a global variable. ^^ – koleS Oct 15 '14 at 20:08
  • since i'm testing simple first.. my one connection is closing before the second even initializes (and therefore adds itself to the factory connections)... i guess num_connections is necessary, then? or should be used instead, since i don't have a use for storing an actual reference to the channel itself at this time. – gloomy.penguin Oct 16 '14 at 21:34