Building a Singer.io tap for an open data source

Singer.io is an open-source JSON-based data shifting (ETL: extract, transform, load) framework, designed to bring simplicity when moving data between a source and a destination service on the Internet. In this post, we present the framework as entry point into the world of SaaS-level data exchange and some associated research questions.

A script that extracts data from a source (like Stripe payments) and outputs it following the Singer format convention is called a tap. A script that consumes that output and works with it (like putting it into a .csv file or a PostgreSQL database) is called a target.

The idea is that, using the defined convention based on JSON data exchange, any tap can be connected to any target. With this in mind, our goal is to see how easy it is to create a tap and connect it to an existing target, as a first step in the evaluation of this framework for prototyping SaaS-level applications working with diverse data sources. In this context, Singer is placed on a higher level compared to general data orchestration frameworks for cloud environments such as Alluxio, and links up with many multi-cloud and cross-cloud ideas but is also subject to the same issues such as the least common denominator problem.

Although there are a lot of already made taps to download and use, those are useful only if you already have meaningful data in any of those sources. For our testing purposes, that was simply not the case.

So today we’re going to build a simple Python tap that takes data from an open data source and outputs it using the Singer spec.

Creating a tap from the template

In order to develop a tap, we first need to install the Singer library:

pip install singer-python

Next, we’re installing cookiecutter and downloading the provided tap template, to have something to start with:

pip install cookiecutter
cookiecutter https://github.com/singer-io/singer-tap-template.git
project_name [e.g. 'tap-facebook']: tap-weather
package_name [tap_weather]:

After downloading the template, cookiecutter will ask for a project name. We chose tap-weather, because our data source will be weather related. Then a package name is required- pressing enter will use the suggested name.

We now have a blank canvas with a few useful functions to get started with. To follow the singer spec, a tap must output two different types of messages:

  • Schema messages: They describe the datatypes of the data in the stream.
  • Record messages: They contain the data to be streamed.

Additionally, state messages can be streamed, as a way to preserve states between tap executions. A tap can also take some .json input files, like a config file, used typically for API credentials.

Defining the schemas

Next step is choosing the source from where the data will be obtained, and understanding the structure and nature of the data. We selected the wind measurement provided by the Federal Office of Meteorology and Climatology MeteoSwiss and published regularly on opendata.swiss. This is a geospatial dataset with strong regional differences, thus using GeoJSON (RFC 7946) as appropriate format.

Taking a look at the .json that we’ll get, it looks something like this:

{
   "crs": {
       "type": "name",
       "properties": {
           "name": "EPSG:21781"
       }
   },
   "license": "https://...",
   "mapname": "ch.meteoschweiz.messwerte-wind-boeenspitze-kmh-10min",
   "map_long_name": "Measurement values wind gust 1 s, 10 min maximum",
   "map_short_name": "Wind gust 1 s, 10 min",
   "map_abstract": "Current measurement values of Wind gust 1 s, 10 min",
   "creation_time": "13.12.2019 14:15",
   "type": "FeatureCollection",
   "features": [
       {
           "type": "Feature",
           "geometry": {
               "type": "Point",
               "coordinates": [
                   771035.92,
                   184826.09
               ]
           },
           "id": "ARO",
           "properties": {
               "station_name": "Arosa",
               "station_symbol": 1,
               "value": 11.9,
               "wind_direction": 54,
               "wind_direction_radian": 0.942478,
               "unit": "km/h",
               "reference_ts": "2019-12-13T13:10:00Z",
               "altitude": "1888.00",
               "measurement_height": "10.00 m",
               "description": "..."
      },
     ...
  ]
}

So now we’re going to define the JSON schemas that will be part of our output, to let the targets know how our data is structured. From that .json we only extracted the “features”, which define each station, its location and wind measurement, along with some other properties. The schema is defined as follows:

{
    "type": "object",
    "properties": {
        "type": {
            "type": "string"
        },
        "geometry": {
            "type": "object",
            "properties": {
                "type": {
                    "type": "string"
                },
                "coordinates": {
                    "type": "array",
                    "items": {
                        "type": "number"
                    },
                    "minItems": 2,
                    "maxItems": 2
                }
            }
        },
        "id": {
            "type": "string",
            "minLength": 3,
            "maxLength": 3
        },
        ...
    }
}

We named this file features.json and put it inside the schemas folder.

In this case we copied each property as-is, but here we have the freedom to choose what properties the record messages will have (with the condition we’ll have to generate the records with that same structure afterwards)

Building the tap

Next we have to create the code that will generate the schema and records from our input.

def main():
   schemas = load_schemas()
   singer.write_schema('features', schemas.get('features'), 'station_properties')
  

   with urllib.request.urlopen('https://data.geo.admin.ch/ch.meteoschweiz.messwerte-wind-boeenspitze-kmh-10min/ch.meteoschweiz.messwerte-wind-boeenspitze-kmh-10min_en.json') as response:
       data = json.loads(response.read().decode())
      
       singer.write_records('features', data.get('features'))

First we get the schemas with the load_schemas() function, which is already provided by the template, and returns a dictionary with all the schemas found in the respective folder.

Then we output the schema with the write_schema function. The first argument is the name of the schema, the second one is the JSON schema itself and the third one is a list of the primary keys for the schema.

Finally we get the data and use the write_records function to output them all. The first argument is the schema name, and the second one the list of objects according to that schema. This part would be different if a different schema structure was chosen, as we would have to traverse the whole features array and build each record separately (which can be done with a similar singer function, write_record).

Running the tap

Let’s create a virtual environment to run our tap on

cd tap-weather
python3 -m venv ~/.virtualenvs/tap-weather
source ~/.virtualenvs/tap-weather/bin/activate
pip install -e
deactivate

We can now try our newly created tap with

 ~/.virtualenvs/tap-weather/bin/tap-weather

As expected, we get our schema message, and a record message for every feature retrieved from the initial .json 🙂

Connecting the tap to existing targets

Based on the Singer premise, any target should work, as long as you are careful with the config files needed to make them work. For this we used the .csv target.

When running a target, it is recommended to create a different virtual environment, so as not to create dependency conflicts.

python3 -m venv ~/.virtualenvs/target-csv
source ~/.virtualenvs/target-csv/bin/activate
pip install target-csv
deactivate

Finally, we can try tap and target together

~/.virtualenvs/tap-weather/bin/tap-weather | ~/.virtualenvs/target-csv/bin/target-csv

A .csv file will be created in the current directory containing all the fetched records

Final thoughts

Singer turned out to be a really easy to understand framework, and in theory, has a lot of potential to achieve its purpose! It’s open-source and, at the time of writing this post, has 72 taps and 10 targets ready to install and use. But, some problems may arise when trying to use those, as not all of them are kept up to date. Case in point, we tried to use the Google Sheets target with our tap, and it wouldn’t work when data included floating point values.

Otherwise, the Singer libraries and templates are fairly well documented and maintained, so the process to create a new tap was pretty straight-forward. Open questions thus include: What is the dynamics and the quality of the ecosystem around taps and targets development? How well does the framework scale for larger quantities of data? How extensible is the framework in case data processing and analytics are to be performed? What are alternative frameworks such as Talend Open Studio or Node Red, and how do they compare? In the Service Prototyping Lab at Zurich University of Applied Sciences, we will investigate these questions in the coming months.


7 Kommentare

  • Hi, the step in ‘building the tap’. The code here, where is this added to? Or should I save this as a separate script? and save/run from where?

    • The code replaces the main() function in the tap’s __init__.py file. Keep in mind that, as this was a simple test, it didn’t take into account config files or catalogs.

      • thanks very much!

        I then get onto the next section, run the command: pip install -e and get this error:

        (venv) C:\Users\Ben\AppData\Local\Google\Cloud SDK\tap-weather>pip install -e

        Usage:
        pip install [options] [package-index-options] …
        pip install [options] -r [package-index-options] …
        pip install [options] [-e] …
        pip install [options] [-e] …
        pip install [options] …

        -e option requires 1 argument

        ————————————————————————
        If I add a / to the end I get the error:

        (venv) C:\Users\Ben\AppData\Local\Google\Cloud SDK\tap-weather>pip install -e /
        ERROR: File “setup.py” not found. Directory cannot be installed in editable mode: C:\

        How can i fix this editable mode issuee, ddo you know?

  • Hello there,
    thanks for the tutorial!

    If i try to execute:
    ~/.virtualenvs/tap-weather/bin/tap-weather

    It unfortunatelly ends with the following:

    Traceback (most recent call last):

    File “/home/dueb/.virtualenvs/tap-weather/lib/python3.5/site-packages/pkg_resources/__init__.py”, line 853, in resolve
    raise DistributionNotFound(req, requirers)
    pkg_resources.DistributionNotFound: The ‘tap-weather’ distribution was not found and is required by the application

    • Hello! If the tap installation went well, I’m not really sure what the underlying issue could be. But just to be safe, make sure your ‘tap-weather’ folder (the one created via cookiecutter) and all of its contents still exist, and haven’t changed name or location since you first installed the tap in the virtual environment. Notice that the installation creates the ‘tap_weather.egg-info’ folder, which is also needed to run the tap.


Leave a Reply

Your email address will not be published. Required fields are marked *