Source⚓︎
The workload Source models the first stage of a processing pipeline.
A Source reads workflow inputs from a specified location
or service in the cloud.
User Guide⚓︎
You can configure the type of Source used in your workload by changing the
source attribute of your workload request.
Terra DataRepo Source⚓︎
You can configure workflow-launcher to fetch workflow inputs
from a Terra Data Repository (TDR) dataset in real-time
using the Terra DataRepo source.
Terra DataRepo source polls a dataset.table's row metadata table
for newly ingested rows.
It snapshots those rows for downstream processing by an Executor.
The Terra DataRepo source can only read inputs from a single table.
When you start the workload, the Terra DataRepo source will start looking
for new rows from that instant.
When you stop the workload, the Terra DataRepo source will stop looking
for new rows from that instant. All pending snapshots may continue
to be processed by a later workload stage.
Note
workflow-launcher creates snapshots of your data to be processed by a
later stage of the workload. Therefore, you must ensure the account
workflow-launcher@firecloud.org is a custodian of your dataset.
A typical Terra DataRepo source configuration in the workload request looks
like:
json
{
"name": "Terra DataRepo",
"dataset": "{dataset-id}",
"table": "{dataset-table-name}",
"snapshotReaders": [
"{user}@{domain}",
...
],
"pollingIntervalMinutes": 1,
"loadTag": "{load-tag-to-poll}"
}
The table below summarises the purpose of each attribute in the above request.
| Attribute | Description |
|---|---|
name |
Selects the Terra DataRepo source implementation. |
dataset |
The UUID of dataset to monitor and read from. |
table |
The name of the dataset table to monitor and read from. |
snapshotReaders |
A list of email addresses to set as readers of all snapshots created in this workload. |
pollingIntervalMinutes |
Optional. Rate at which WFL will poll TDR for new rows to snapshot. |
loadTag |
Optional. Snapshot only new rows ingested with loadTag. |
dataset⚓︎
The UUID uniquely identifying the TDR dataset which WFL will poll
for new workflow inputs.
table⚓︎
The name of the table in dataset which WFL will poll
for new workflow inputs.
You should design this table such that
each row contains all the inputs required to execute a workflow by the workload
Executor downstream.
snapshotReaders⚓︎
The email addresses of those whom should be "readers" of all snapshots created by workflow-launcher in this workload. You can specify individual users and/or Terra/Firecloud groups.
Optional pollingIntervalMinutes⚓︎
The rate, in minutes, at which WFL will poll TDR for new rows to snapshot. If not provided, the default interval is 20 minutes.
Optional loadTag⚓︎
Specifying a loadTag
restricts the set of new rows
that WFL will include in a snapshot.
The loadTag specifies
a prefix to be matched
against the loadTag used
to ingest data to TDR.
Thus my-ingest-tag matches
all of the following load tags.
- `my-ingest-tag`
- `my-ingest-tag-`
- `my-ingest-tag-0`
- `my-ingest-tag-a`
- `my-ingest-tag-2022-07-14
When not provided, all new rows will be eligible for snapshotting.
Note
When initiating a TDR ingest, one can optionally set a load tag. Each row ingested will have this load tag set in its corresponding TDR row metadata table (only visible in BigQuery and not TDR UI). The same load tag can be reused across multiple ingests to logically link them.
For questions on how to set the load tag for a particular flavor of ingest, contact #dsp-jade
TDR Snapshots Source⚓︎
You can configure workflow-launcher to use a list of TDR snapshots directly.
This may be useful if you don't want workflow-launcher to be a custodian of your
dataset or if you already have snapshots you want to process. In this case you
must ensure that workflow-launcher@firecloud.org is a reader of all
snapshots you want it to process.
A typical TDR Snapshots source configuration in the workload request looks
like:
json
{
"name": "TDR Snapshots",
"snapshots": [
"{snapshot-id}",
...
]
}
The table below summarises the purpose of each attribute in the above request.
| Attribute | Description |
|---|---|
name |
Selects the TDR Snapshots source implementation. |
snapshots |
A List of UUIDs of snapshots to process. |
Note
You must ensure that the snapshots you list are compatible with the downstream processing stage that consumes them.
Developer Guide⚓︎
A source is a Queue that satisfies the Source protocol below:
clojure
(defprotocol Source
(start-source!
^Unit
[^Connection transaction ;; JDBC Connection
^Source source ;; This source instance
]
"Start enqueuing items onto the `source`'s queue to be consumed by a later
processing stage. This operation should not perform any long-running
external effects other than database operations via the `transaction`. This
function is called at most once during a workload's operation.")
(stop-source!
^Unit
[^Connection transaction ;; JDBC Connection
^Source source ;; This source instance
]
"Stop enqueuing inputs onto the `source`'s queue to be consumed by a later
processing stage. This operation should not perform any long-running
external effects other than database operations via the `transaction`. This
function is called at most once during a workload's operation and will only
be called after `start-source!`. Any outstanding items on the `source`
queue may still be consumed by a later processing stage.")
(update-source!
^Workload
[^Workload workload]
"Enqueue items onto the `workload`'s source queue to be consumed by a later processing
stage unless stopped, performing any external effects as necessary.
Implementations should avoid maintaining in-memory state and making long-
running external calls, favouring internal queues to manage such tasks
asynchronously between invocations. This function is called one or more
times after `start-source!` and may be called after `stop-source!`"))
Note
The Source protocol is implemented by a set of multimethods of the same
name. The use of a protocol is to illustrate the difference between the
in-memory data model of a Source and the metadata seen by a user.
To be used in a workload,
a Source implementation
should satisfy the processing Stage protocol
and the to-edn multimethod
in addition to the following multimethods
specific to sinks:
``clojure
(defmulti create-source
"Create aSourceinstance using the databasetransactionand configuration
in the sourcerequestand return a[type items]pair to be written to a
workload record assource_typeandsource_items.
Notes:
- This is a factory method registered for workload creation.
- TheSourcetype string must match a value of thesourceenum in the
database schema.
- This multimethod is type-dispatched on the:nameassociation in therequest`."
(fn ^[^String ^String]
[^Connection transaction ;; JDBC Connection
^long workload-id ;; ID of the workload being created
^IPersistentHashMap request ;; Data forwarded to the handler
]
(:name request)))
(defmulti load-source!
"Return the Source implementation associated with the source_type and
source_items fields of the workload row in the database. Note that this
multimethod is type-dispatched on the :source_type association in the
workload."
(fn ^Sink
[^Connection transaction ;; JDBC Connection
^IPersistentHashMap workload ;; Row from workload table
]
(:source_type workload)))
```