LogoLogo
AMPS Python Client 5.3.4
AMPS Python Client 5.3.4
  • Welcome to the AMPS Python Client
    • Before You Start
    • Obtaining and Installing the AMPS Python Client
    • Your First AMPS Program
      • Client Identification
      • Connection Strings for AMPS
      • Connection Parameters for AMPS
      • Providing Credentials to AMPS
    • Subscriptions
      • Content Filtering
        • Changing the Filter on a Subscription
      • Synchronous Message Processing
      • Asynchronous Message Processing
        • Understanding Threading
      • Understanding Message Objects
      • Regular Expression Subscriptions
      • Ending Subscriptions
    • Error Handling
      • Exceptions
      • Exception Types
      • Exception Handling and Asynchronous Message Processing
      • Controlling Blocking with Command Timeout
      • Disconnect Handling
        • Using a Heartbeat to Detect Disconnection
        • Managing Disconnection
        • Replacing Disconnect Handling
      • Unexpected Messages
      • Unhandled Exceptions
      • Detecting Write Failures
      • Monitoring Connection State
    • State of the World
      • SOW and Subscribe
      • Setting Batch Size
      • Managing SOW Contents
      • Client Side Conflation
    • Using Queues
      • Backlog and Smart Pipelining
      • Acknowledging Messages
      • Returning a Message to the Queue
      • Manual Acknowledgment
    • Delta Publish and Subscribe
      • Delta Subscribe
      • Delta Publish
    • High Availability
    • AMPS Programming: Working with Commands
    • Utility Classes
    • Advanced Topics
    • Exceptions Reference
    • AMPS Server Documentation
    • API Documentation
Powered by GitBook

Get Help

  • FAQ
  • Legacy Documentation
  • Support / Contact Us

Get AMPS

  • Evaluate
  • Develop

60East Resources

  • Website
  • Privacy Policy

Copyright 2013-2024 60East Technologies, Inc.

On this page
Export as PDF
  1. Welcome to the AMPS Python Client
  2. State of the World

SOW and Subscribe

Imagine an application that displays real time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting messages to the van_location topic, configured with a key of van_id on the AMPS server.

In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sow_and_subscribe() method. As with the other methods for receiving messages, the AMPS Python client provides a basic, synchronous form of sow_and subscribe that provides you with a MessageStream to iterate over, and an asynchronous form that requires a message handler.

First, let's look at an example that uses the basic form of sow_and_subscribe:

def report_van_position(client):
    # sow_and_subscribe command to begin receiving information about all of the active
    # delivery vans in the system. All of the vans in the system now are returned as
    # Message objects whose get_command returns sow. New messages coming in are
    # returned as Message objects whose get_command returns publish.
    for message in client.execute(Command("sow_and_subscribe")  \
            .set_topic("van_location")                          \
            .set_filter("/status = 'ACTIVE'")                   \
            .set_batch_size(100)                                \
            .set_options("oof")):
            # Notice here that we specified the oof option to the command. Setting this
            # option causes AMPS to send Out-of-Focus (OOF) messages for the topic.
            # OOF messages are sent when an entry that was sent to us in the past no
            # longer matches our query. This happens when an entry is removed from the
            # SOW cache via a sow_delete operation, when the entry expires (as specified
            # by the expiration time on the message or by the configuration of that topic
            # on the AMPS server), or when the entry no longer matches the content filter
            # specified. In our case, if a van’s status changes to something other than
            # ACTIVE, it no longer matches the content filter, and becomes out of focus.
            # When this occurs, a Message is sent with Command set to oof. We use OOF
            # messages to remove vans from the display as they become inactive, expire,
            # or are deleted.
      if (message.get_command() == Message.Command.SOW or
          message.get_command() == Message.Command.Publish):

          # For each of these messages we call add_or_update_van(), that presumably adds
          # the van to our application’s display. As vans send updates to the AMPS server,
          # those are also received by the client because of the subscription placed by
          # sow_and_subscribe(). Our application does not need to distinguish between
          # updates and the original set of vans we found via the SOW query, so we use
          # add_or_update_van() to display the new position of vans as well.
          add_or_update_van(message)
      elif message.get_command() == Message.Command.OOF:
          remove_van(message)

Now we will look at an example that uses the asynchronous form of sow_and_subscribe:

def update_van_position(message):
    if (message.get_command() == Message.Command.SOW or
        message.get_command() == Message.Command.Publish):
        add_or_update_van(message)
    elif message.get_command() == Message.Command.OOF:
        remove_van(message)


def subscribe_to_van_location(client):
    client.execute_async(
           Command("sow_and_subscribe")          \
             .set_topic("van_location")          \
             .set_filter("/status = 'ACTIVE'")   \
             .set_batch_size(100)                \
             .set_options("oof"),                \
       update_van_position)

Notice that the two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens. The other form processes messages on the main thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the calls to add_or_update_van and remove_van receive the same data.

PreviousState of the WorldNextSetting Batch Size

Last updated 3 months ago