As presented in a prior post, Singer.io is a modern, open-source ETL (Extract, Transform and Load) framework for integrating data from various sources, including online datasets and services, with a focus on being simple and light-weight. The basics of the framework were explored in our last post on the topic, so we will refer you to that if you are unfamiliar.

This post is about our process for deploying Singer to the cloud, more specifically, to the Cloud Foundry open source cloud application platform. This was done in the context of researching the maturity of data transformation tools in a cloud-native environment. We will explore the options for deploying Singer taps and targets to a cloud provider and discuss our implementation and deployment process in detail.

Exploring our options

Several ideas were considered to deploy Singer tap-target pairs to the cloud. We quickly found out two options that did not work, and those are:

  • Simply deploying the tap and target as separate apps with no changes
  • Deploying the whole pair as one app

The background for why this second option was not viable has a fairly straightforward explanation. Singer is a specification. It merely specifies the message format that the tap and target use in their communication, thus allowing taps to communicate with any target and vice versa. The way this is implemented is for a tap’s standard output to be piped into a target’s standard input.

Additionally, the maintenance state of the different taps and targets varies, with some instances of conflicting dependencies not being unheard of, which Singer themselves recommend solving by running each half of the pipeline in a separate Python virtual environment.

Options we considered

The first obvious solution was to create a Docker container with the tap and target pair, giving us the freedom to run the pair and the needed virtual environments. The drawbacks of this method were that we effectively lose the flexibility of Singer by binding the tap with the target.

The second attempt had to be more invasive to the code of Singer taps and targets, as we decided to implement cloud-native versions of these apps that communicate via the network. To better match Singer’s format, gRPC was the method we selected.

Modifying Singer to use gRPC

We will use the exchange rates API tap and the CSV target as a starting point, due to their simplicity in terms of code. To set up our gRPC-Singer, we needed a prototype file to define our service:

package singer;

// Interface exported by the server.
service Singer {
  // A client-to-server streaming RPC.
  rpc Sing(stream Message) returns (Status) {}
}
message Message {
  string blob = 1;
}

message Status {
  string status = 1;
}

We chose string as the message format as Singer targets already parse messages from strings. The status message is not that important to us.

As can be seen by the service definition, the Sing RPC method receives a stream of messages. The next step is to generate the Python code we need to implement this service. The command is as follows:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./singer.proto

Now is the time to modify the code to perform the gRPC, starting from the tap. Here we need to transform the function that writes the messages to standard output, to a Python generator. To do this we first need to replace two of Singer’s utility functions:

def write_record(stream_name, record, stream_alias=None, time_extracted=None):
    return singer_pb2.Message(blob=json.dumps(singer.RecordMessage(stream=(stream_alias or stream_name),
                                record=record,
                                time_extracted=time_extracted).asdict()))


def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None):
    if isinstance(key_properties, (str, bytes)):
        key_properties = [key_properties]
    if not isinstance(key_properties, list):
        raise Exception("key_properties must be a string or list of strings")

    return singer_pb2.Message(blob=json.dumps(singer.SchemaMessage(
            stream=(stream_alias or stream_name),
            schema=schema,
            key_properties=key_properties,
            bookmark_properties=bookmark_properties).asdict()))

To make the function a generator, we need it to yield these messages:

...
yield write_schema('exchange_rate', schema, 'date')
...
yield write_record('exchange_rate', record)
...

Finally, we need the code to make it an aiohttp server to receive our request, and the code that actually makes the RPC call.

@routes.post('/')
async def main(request):
    params = await request.json()
    config = params['config']
    target = params['target']

    state = {}

    start_date = state.get('start_date') or config.get('start_date') or datetime.utcnow().strftime(DATE_FORMAT)
    start_date = singer.utils.strptime_with_tz(start_date).date().strftime(DATE_FORMAT)
    with grpc.insecure_channel(f'{target}.apps.internal:8080') as channel:
            stub = singer_pb2_grpc.SingerStub(channel)
            response = do_sync(config.get('base', 'USD'), start_date)
            status = stub.Sing(response)
    return web.json_response(status.status)

Now on to the target. Turns out the modifications here are less intrusive, but we still need to tell the target that the actual Singer message is now a property of the message we are sending in our stream:

message = message.blob

And of course, we need to set up our gRPC server:

class SingerServicer(singer_pb2_grpc.SingerServicer):

    def Sing(self, request_iterator, context):
        config = {}
        persist_messages(config.get('delimiter', ','),
                                 config.get('quotechar', '"'),
                                 request_iterator,
                                 config.get('destination_path', ''))
        return singer_pb2.Status(status = "OK")


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    singer_pb2_grpc.add_SingerServicer_to_server(
        SingerServicer(), server)
    server.add_insecure_port('[::]:8080')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

Deploying to Cloud Foundry

Deploying these apps to Cloud Foundry could have been very straightforward, but we wanted our taps and targets to share Singer’s philosophy of being generic. Unfortunately, that means any new taps need to follow the service definition we just wrote, but at least we can make them work with any target. For the case of Cloud Foundry, we can achieve this with Container to Container Networking.

An internal route to the target needs to be set up:

cf map-route singergrpctarget apps.internal --hostname singergrpctarget

In addition, network policies that allow the tap to reach the target need to be created.

cf add-network-policy singergrpctap --destination-app singergrpctarget --port 8080 --protocol tcp

With this, the tap can resolve the target at singergrpctarget.apps.internal.

Running traditional Singer the user is the only one aware of the combination of tap and target used, but in our case, the tap needs to know where to send the gRPC call. For this purpose, the request to the target needs to have the target, and the configuration for the tap.

{
    "target": "singergrpctarget",
    "config": {
        "base": "CHF",
        "start_date": "2020-02-20"
    }
}

Because of our internal networking, the tap knows that it can resolve the target at {target}.apps.internal.

Cloud Foundry dashboard view of the gRPC tap receiving the sample request.
The Singer gRPC tap receiving the above request
Cloud Foundry dashboard view of the gRPC target.
Our gRPC target in the Cloud Foundry dashboard

Closing thoughts

Deploying Singer.io to the cloud was not straightforward, and the lack of a traditional comprehensive API meant that a generic wrapper could not be created trivially, but we have demonstrated a possible method to convert taps and targets to a more cloud-friendly format and hope to inspire more usage of the framework in a cloud-native context.

Our experiences with Singer and Cloud Foundry also prompted an interesting observation, that while local (CLI) applications can very easily exchange information by being piped into each other, the same cannot be said about PaaS applications. While several solutions, including our gRPC implementation or fully integrated frameworks that negate the need for a data flow, are possible, there is still development cost associated with implementing a data pipeline between different applications, an aspect inviting future work in streamlining the process.