Trigger Dag at a specific execution date from Airflow WebUI (version: 1.10.9)
We can trigger a dag run by CLI or REST API with the execution date argument or parameter.
CLI
$ airflow trigger_dag -e '2020-04-19T04:00:00' <DAG_ID>
REST API
curl -X POST \
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"execution_date": "2020-04-19T04:00:00"}'
Web GUI
However, on the Web GUI, we can only trigger the dag now.
We can change a little bit in the source code and here is my solution.
- change the confirmTriggerDag(..) method in dag.html and dags.html
- <venv_path>/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html
- <venv_path>/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html
function confirmTriggerDag(link, dag_id){
var execution_date = prompt("Please enter execution date (ex: 2020-04-19T00:00:00)");// when click 'Cancel'
if (execution_date == null){
return false;
}
// when input nothing and click 'OK', trigger the dag now.
if (!execution_date){
if (confirm("Are you sure you want to run '"+dag_id+"' now ?")) {
postAsForm(link.href, {execution_date: ''});
}
return false;
}// when input a invalid execution_date and click 'OK'
// https://stackoverflow.com/questions/3143070/javascript-regex-iso-datetime
dt_regex = /\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d([+-][0-2]\d:[0-5]\d|Z)?/
if (!dt_regex.test(execution_date)){
alert("Invalid execution_date format");
return false;
}// when input a valid execution_date and click 'OK'
if (confirm("Are you sure you want to run '"+dag_id+"' " + execution_date + " ?")) {
postAsForm(link.href, {execution_date: execution_date});
}
// Never follow the link
return false;
}
2. change the trigger(..) method in views.py
- <venv_path>/lib/python3.7/site-packages/airflow/www/views.py
@expose('/trigger', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
@provide_session
def trigger(self, session=None):
dag_id = request.values.get('dag_id')
execution_date_str = request.values.get('execution_date') # handle the execution_date parameter origin = request.values.get('origin') or "/admin/"
dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
if not dag:
flash("Cannot find dag {}".format(dag_id))
return redirect(origin) if execution_date_str == "":
execution_date = timezone.utcnow()
else:
try:
execution_date = pendulum.parse(execution_date_str)
except:
flash("Invalid date string: {}".format(execution_date_str))
return redirect(origin)
run_id = "manual__{0}".format(execution_date.isoformat()) dr = DagRun.find(dag_id=dag_id, run_id=run_id)
if dr:
flash("This run_id {} already exists".format(run_id))
return redirect(origin) run_conf = {} try:
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
)
except:
pass flash(
"Triggered {} {}, "
"it should start any moment now.".format(dag_id, execution_date))
return redirect(origin)
Sample
- trigger DAG at 2019–12–12T12:00:00
※ The DAG won’t start if you trigger DAG before the start date.
※ You can use backfill the DAG before the start date, but it can only trigger DAG at the scheduled time.
# DAG schedule_interval defined
$ airflow backfill -s '2020-04-19' -e '2020-04-19' <DAG_ID>