Source⚓︎
The workload Source
models the first stage of a processing pipeline.
In a typical workload configuration,
a Source
can be used to read workflow inputs
from a specified location or service in the cloud.
User Guide⚓︎
You can configure the type of Source
used in your workload by changing the
source
attribute of your workload request.
Terra DataRepo
Source⚓︎
You can configure workflow-launcher to fetch workflow inputs
from a Terra Data Repository (TDR) dataset in real-time
using the Terra DataRepo
source.
Terra DataRepo
source polls a dataset.table
's row metadata table
for newly ingested rows.
It snapshots those rows for downstream processing by an Executor.
The Terra DataRepo
source can only read inputs from a single table.
When you start
the workload, the Terra DataRepo
source will start looking
for new rows from that instant.
When you stop
the workload, the Terra DataRepo
source will stop looking
for new rows from that instant. All pending snapshots may continue
to be processed by a later workload stage.
Note
workflow-launcher creates snapshots of your data to be processed by a
later stage of the workload. Therefore, you must ensure the account
workflow-launcher@firecloud.org
is a custodian
of your dataset.
A typical Terra DataRepo
source configuration in the workload request looks
like:
{
"name": "Terra DataRepo",
"dataset": "{dataset-id}",
"table": "{dataset-table-name}",
"snapshotReaders": [
"{user}@{domain}",
...
],
"pollingIntervalMinutes": 1,
"loadTag": "{load-tag-to-poll}"
}
Attribute | Description |
---|---|
name |
Selects the Terra DataRepo source implementation. |
dataset |
The UUID of dataset to monitor and read from. |
table |
The name of the dataset table to monitor and read from. |
snapshotReaders |
A list of email addresses to set as readers of all snapshots created in this workload. |
pollingIntervalMinutes |
Optional. Rate at which WFL will poll TDR for new rows to snapshot. |
loadTag |
Optional. Only snapshot new rows ingested to TDR with loadTag . |
dataset
⚓︎
The UUID
uniquely identifying the TDR dataset which WFL will poll
for new workflow inputs.
table
⚓︎
The name of the table in dataset
which WFL will poll
for new workflow inputs.
You should design this table such that
each row contains all the inputs required to execute a workflow by the workload
Executor
downstream.
snapshotReaders
⚓︎
The email addresses of those whom should be "readers" of all snapshots created by workflow-launcher in this workload. You can specify individual users and/or Terra/Firecloud groups.
Optional pollingIntervalMinutes
⚓︎
The rate, in minutes, at which WFL will poll TDR for new rows to snapshot. If not provided, the default interval is 20 minutes.
Optional loadTag
⚓︎
WFL will only snapshot new rows ingested to TDR with loadTag
.
If not provided, all new rows will be eligible for snapshotting.
Note
When initiating a TDR ingest, one can optionally set a load tag. Each row ingested will have this load tag set in its corresponding TDR row metadata table (only visible in BigQuery and not TDR UI). The same load tag can be reused across multiple ingests to logically link them.
For questions on how to set the load tag for a particular flavor of ingest, contact #dsp-jade
TDR Snapshots
Source⚓︎
You can configure workflow-launcher to use a list of TDR snapshots directly.
This may be useful if you don't want workflow-launcher to be a custodian of your
dataset or if you already have snapshots you want to process. In this case you
must ensure that workflow-launcher@firecloud.org
is a reader
of all
snapshots you want it to process.
A typical TDR Snapshots
source configuration in the workload request looks
like:
{
"name": "TDR Snapshots",
"snapshots": [
"{snapshot-id}",
...
]
}
The table below summarises the purpose of each attribute in the above request.
Attribute | Description |
---|---|
name |
Selects the TDR Snapshots source implementation. |
snapshots |
A List of UUID s of snapshots to process. |
Note
You must ensure that the snapshots you list are compatible with the downstream processing stage that consumes them.
Developer Guide⚓︎
A source is a Queue
that satisfies the Source
protocol below:
(defprotocol Source
(start-source!
^Unit
[^Connection transaction ;; JDBC Connection
^Source source ;; This source instance
]
"Start enqueuing items onto the `source`'s queue to be consumed by a later
processing stage. This operation should not perform any long-running
external effects other than database operations via the `transaction`. This
function is called at most once during a workload's operation.")
(stop-source!
^Unit
[^Connection transaction ;; JDBC Connection
^Source source ;; This source instance
]
"Stop enqueuing inputs onto the `source`'s queue to be consumed by a later
processing stage. This operation should not perform any long-running
external effects other than database operations via the `transaction`. This
function is called at most once during a workload's operation and will only
be called after `start-source!`. Any outstanding items on the `source`
queue may still be consumed by a later processing stage.")
(update-source!
^Workload
[^Workload workload]
"Enqueue items onto the `workload`'s source queue to be consumed by a later processing
stage unless stopped, performing any external effects as necessary.
Implementations should avoid maintaining in-memory state and making long-
running external calls, favouring internal queues to manage such tasks
asynchronously between invocations. This function is called one or more
times after `start-source!` and may be called after `stop-source!`"))
Note
The Source
protocol is implemented by a set of multimethods of the same
name. The use of a protocol is to illustrate the difference between the
in-memory data model of a Source
and the metadata seen by a user.
To be used in a workload,
a Source
implementation
should satisfy the processing Stage
protocol
and the to-edn
multimethod
in addition to the following multimethods
specific to sinks:
(defmulti create-source
"Create a `Source` instance using the database `transaction` and configuration
in the source `request` and return a `[type items]` pair to be written to a
workload record as `source_type` and `source_items`.
Notes:
- This is a factory method registered for workload creation.
- The `Source` type string must match a value of the `source` enum in the
database schema.
- This multimethod is type-dispatched on the `:name` association in the
`request`."
(fn ^[^String ^String]
[^Connection transaction ;; JDBC Connection
^long workload-id ;; ID of the workload being created
^IPersistentHashMap request ;; Data forwarded to the handler
]
(:name request)))
(defmulti load-source!
"Return the `Source` implementation associated with the `source_type` and
`source_items` fields of the `workload` row in the database. Note that this
multimethod is type-dispatched on the `:source_type` association in the
`workload`."
(fn ^Sink
[^Connection transaction ;; JDBC Connection
^IPersistentHashMap workload ;; Row from workload table
]
(:source_type workload)))