Skip to content

Source⚓︎

The workload Source models the first stage of a processing pipeline. A Source reads workflow inputs from a specified location or service in the cloud.

User Guide⚓︎

You can configure the type of Source used in your workload by changing the source attribute of your workload request.

Terra DataRepo Source⚓︎

You can configure workflow-launcher to fetch workflow inputs from a Terra Data Repository (TDR) dataset in real-time using the Terra DataRepo source.

Terra DataRepo source polls a dataset.table's row metadata table for newly ingested rows. It snapshots those rows for downstream processing by an Executor. The Terra DataRepo source can only read inputs from a single table.

When you start the workload, the Terra DataRepo source will start looking for new rows from that instant.

When you stop the workload, the Terra DataRepo source will stop looking for new rows from that instant. All pending snapshots may continue to be processed by a later workload stage.

Note

workflow-launcher creates snapshots of your data to be processed by a later stage of the workload. Therefore, you must ensure the account workflow-launcher@firecloud.org is a custodian of your dataset.

A typical Terra DataRepo source configuration in the workload request looks like: json { "name": "Terra DataRepo", "dataset": "{dataset-id}", "table": "{dataset-table-name}", "snapshotReaders": [ "{user}@{domain}", ... ], "pollingIntervalMinutes": 1, "loadTag": "{load-tag-to-poll}" } The table below summarises the purpose of each attribute in the above request.

Attribute Description
name Selects the Terra DataRepo source implementation.
dataset The UUID of dataset to monitor and read from.
table The name of the dataset table to monitor and read from.
snapshotReaders A list of email addresses to set as readers of all snapshots created in this workload.
pollingIntervalMinutes Optional. Rate at which WFL will poll TDR for new rows to snapshot.
loadTag Optional. Snapshot only new rows ingested with loadTag.

dataset⚓︎

The UUID uniquely identifying the TDR dataset which WFL will poll for new workflow inputs.

table⚓︎

The name of the table in dataset which WFL will poll for new workflow inputs.

You should design this table such that each row contains all the inputs required to execute a workflow by the workload Executor downstream.

snapshotReaders⚓︎

The email addresses of those whom should be "readers" of all snapshots created by workflow-launcher in this workload. You can specify individual users and/or Terra/Firecloud groups.

Optional pollingIntervalMinutes⚓︎

The rate, in minutes, at which WFL will poll TDR for new rows to snapshot. If not provided, the default interval is 20 minutes.

Optional loadTag⚓︎

Specifying a loadTag restricts the set of new rows that WFL will include in a snapshot. The loadTag specifies a prefix to be matched against the loadTag used to ingest data to TDR. Thus my-ingest-tag matches all of the following load tags.

- `my-ingest-tag`
- `my-ingest-tag-`
- `my-ingest-tag-0`
- `my-ingest-tag-a`
- `my-ingest-tag-2022-07-14

When not provided, all new rows will be eligible for snapshotting.

Note

When initiating a TDR ingest, one can optionally set a load tag. Each row ingested will have this load tag set in its corresponding TDR row metadata table (only visible in BigQuery and not TDR UI). The same load tag can be reused across multiple ingests to logically link them.

For questions on how to set the load tag for a particular flavor of ingest, contact #dsp-jade

TDR Snapshots Source⚓︎

You can configure workflow-launcher to use a list of TDR snapshots directly. This may be useful if you don't want workflow-launcher to be a custodian of your dataset or if you already have snapshots you want to process. In this case you must ensure that workflow-launcher@firecloud.org is a reader of all snapshots you want it to process.

A typical TDR Snapshots source configuration in the workload request looks like: json { "name": "TDR Snapshots", "snapshots": [ "{snapshot-id}", ... ] }

The table below summarises the purpose of each attribute in the above request.

Attribute Description
name Selects the TDR Snapshots source implementation.
snapshots A List of UUIDs of snapshots to process.

Note

You must ensure that the snapshots you list are compatible with the downstream processing stage that consumes them.

Developer Guide⚓︎

A source is a Queue that satisfies the Source protocol below: clojure (defprotocol Source (start-source! ^Unit [^Connection transaction ;; JDBC Connection ^Source source ;; This source instance ] "Start enqueuing items onto the `source`'s queue to be consumed by a later processing stage. This operation should not perform any long-running external effects other than database operations via the `transaction`. This function is called at most once during a workload's operation.") (stop-source! ^Unit [^Connection transaction ;; JDBC Connection ^Source source ;; This source instance ] "Stop enqueuing inputs onto the `source`'s queue to be consumed by a later processing stage. This operation should not perform any long-running external effects other than database operations via the `transaction`. This function is called at most once during a workload's operation and will only be called after `start-source!`. Any outstanding items on the `source` queue may still be consumed by a later processing stage.") (update-source! ^Workload [^Workload workload] "Enqueue items onto the `workload`'s source queue to be consumed by a later processing stage unless stopped, performing any external effects as necessary. Implementations should avoid maintaining in-memory state and making long- running external calls, favouring internal queues to manage such tasks asynchronously between invocations. This function is called one or more times after `start-source!` and may be called after `stop-source!`"))

Note

The Source protocol is implemented by a set of multimethods of the same name. The use of a protocol is to illustrate the difference between the in-memory data model of a Source and the metadata seen by a user.

To be used in a workload, a Source implementation should satisfy the processing Stage protocol and the to-edn multimethod in addition to the following multimethods specific to sinks:

``clojure (defmulti create-source "Create aSourceinstance using the databasetransactionand configuration in the sourcerequestand return a[type items]pair to be written to a workload record assource_typeandsource_items. Notes: - This is a factory method registered for workload creation. - TheSourcetype string must match a value of thesourceenum in the database schema. - This multimethod is type-dispatched on the:nameassociation in therequest`." (fn ^[^String ^String] [^Connection transaction ;; JDBC Connection ^long workload-id ;; ID of the workload being created ^IPersistentHashMap request ;; Data forwarded to the handler ] (:name request)))

(defmulti load-source! "Return the Source implementation associated with the source_type and source_items fields of the workload row in the database. Note that this multimethod is type-dispatched on the :source_type association in the workload." (fn ^Sink [^Connection transaction ;; JDBC Connection ^IPersistentHashMap workload ;; Row from workload table ] (:source_type workload))) ```

Back to top