5

I'm facing a problem with an Akka supervisor actor. When the child actor throws an exception within onFailure method of a future result, the supervisor does not handle the error (I want to restart the child in the case of a ConnectException).

I'm using Akka 2.3.7.

This is the supervisor actor:

class MobileUsersActor extends Actor with ActorLogging {

  import Model.Implicits._
  import Model.MobileNotifications

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
      case _: java.net.ConnectException => {
        Logger.error("API connection error. Check your proxy configuration.")
        Restart
      }
    }

  def receive = {
    case Start => findMobileUsers
  }

  private def findMobileUsers = {
    val notis = MobileNotificationsRepository().find()
    notis.map(invokePushSender)
  }

  private def invokePushSender(notis: List[MobileNotifications]) = {
    notis.foreach { n =>
      val pushSender = context.actorOf(PushSenderActor.props)
      pushSender ! Send(n)
    }
  }

}

And this is the child actor:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => throw e
      }
    }
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

I tried to notify sender with an akka.actor.Status.Failure(e) as is suggested here, but did not work, the exception keep unhandled by the supervisor.

As a workaround, I found this way to get it work:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! APIConnectionError
      }
    }
    case APIConnectionError => throw new ConnectException
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

Is this an Akka bug or am I doing something wrong?

Thanks!

Community
  • 1
  • 1
Gabriel Volpe
  • 101
  • 1
  • 8

1 Answers1

4

I think that the problem is that the exception thrown inside the Future doesn't belong to the same thread (potentially) as the one the Actor is running (someone more experienced can elaborate on this). So, the problem is that the exception thrown inside the Future body is "swallowed" and not propagated to the Actor. Since this is the case, the Actor doesn't fail and so there's no need to apply the supervision strategy. So, the first solution that comes to my mind is to wrap the exception inside the Future in some message, send it to yourself, and then throw it from inside the Actor context itself. This time, the Exception will be caught and the supervision strategy will be applied. Note, however, that unless you send the Send(noti) message again, you will not see the Exception happening since the Actor was restarted. All in all, the code would be like this:

class PushSenderActor extends Actor with ActorLogging {

  case class SmthFailed(e: Exception)

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! SmthFailed(e) // send the exception to yourself
      }
    }

    case SmthFailed(e) =>
      throw e // this one will be caught by the supervisor
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message, noti.users)
  }

}

Hope it helped.

ale64bit
  • 6,232
  • 3
  • 24
  • 44
  • Thank you, your solution is similar to the workaround I found and it is what I'm currently using it (see last part of my post), but I think Akka must provide something to deal with instead of sending a message to self and then throw the exception. – Gabriel Volpe Nov 22 '14 at 19:48
  • @GabrielVolpe You are welcome. And I think that as long as you are inside the Future, you cannot propagate such exception to the calling thread. It's designed in order to not break it in that way; the body of the Future captures the Exception himself, and you cannot send it out. Even for using akka.actor.Status.Failure(e), you would have to capture it and wrap it yourself. But I hope there's a clever way ))) – ale64bit Nov 22 '14 at 19:52