Trigger Dag at a specific execution date from Airflow WebUI (version: 1.10.9)

Kaka.Go
3 min readApr 19, 2020

We can trigger a dag run by CLI or REST API with the execution date argument or parameter.

CLI

$ airflow trigger_dag -e '2020-04-19T04:00:00' <DAG_ID>

REST API

curl -X POST \
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"execution_date": "2020-04-19T04:00:00"}'

Web GUI

However, on the Web GUI, we can only trigger the dag now.

Can we trigger the dag at a specific date-time?

We can change a little bit in the source code and here is my solution.

input an execution date before trigger the DAG
  1. change the confirmTriggerDag(..) method in dag.html and dags.html
  • <venv_path>/lib/python3.7/site-packages/airflow/www/templates/airflow/dag.html
  • <venv_path>/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html
function confirmTriggerDag(link, dag_id){
var execution_date = prompt("Please enter execution date (ex: 2020-04-19T00:00:00)");
// when click 'Cancel'
if (execution_date == null){
return false;
}

// when input nothing and click 'OK', trigger the dag now.
if (!execution_date){
if (confirm("Are you sure you want to run '"+dag_id+"' now ?")) {
postAsForm(link.href, {execution_date: ''});
}
return false;
}
// when input a invalid execution_date and click 'OK'
// https://stackoverflow.com/questions/3143070/javascript-regex-iso-datetime
dt_regex = /\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d([+-][0-2]\d:[0-5]\d|Z)?/
if (!dt_regex.test(execution_date)){
alert("Invalid execution_date format");
return false;
}
// when input a valid execution_date and click 'OK'
if (confirm("Are you sure you want to run '"+dag_id+"' " + execution_date + " ?")) {
postAsForm(link.href, {execution_date: execution_date});
}
// Never follow the link
return false;
}

2. change the trigger(..) method in views.py

  • <venv_path>/lib/python3.7/site-packages/airflow/www/views.py
@expose('/trigger', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
@provide_session
def trigger(self, session=None):
dag_id = request.values.get('dag_id')
execution_date_str = request.values.get('execution_date') # handle the execution_date parameter
origin = request.values.get('origin') or "/admin/"
dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
if not dag:
flash("Cannot find dag {}".format(dag_id))
return redirect(origin)
if execution_date_str == "":
execution_date = timezone.utcnow()

else:
try:
execution_date = pendulum.parse(execution_date_str)
except:
flash("Invalid date string: {}".format(execution_date_str))
return redirect(origin)


run_id = "manual__{0}".format(execution_date.isoformat())
dr = DagRun.find(dag_id=dag_id, run_id=run_id)
if dr:
flash("This run_id {} already exists".format(run_id))
return redirect(origin)
run_conf = {} try:
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
)
except:
pass
flash(
"Triggered {} {}, "
"it should start any moment now.".format(dag_id, execution_date))

return redirect(origin)

Sample

  • trigger DAG at 2019–12–12T12:00:00
check the execution

※ The DAG won’t start if you trigger DAG before the start date.

※ You can use backfill the DAG before the start date, but it can only trigger DAG at the scheduled time.

# DAG schedule_interval defined
$ airflow backfill -s '2020-04-19' -e '2020-04-19' <DAG_ID>

ref

--

--