-
-
Notifications
You must be signed in to change notification settings - Fork 987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
policies/geoip: distance + impossible travel #12541
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# Generated by Django 5.0.10 on 2025-01-02 20:40 | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
("authentik_policies_geoip", "0001_initial"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="check_history_distance", | ||
field=models.BooleanField(default=False), | ||
), | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="check_impossible_travel", | ||
field=models.BooleanField(default=False), | ||
), | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="distance_tolerance_km", | ||
field=models.PositiveIntegerField(default=50), | ||
), | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="history_login_count", | ||
field=models.PositiveIntegerField(default=5), | ||
), | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="history_max_distance_km", | ||
field=models.PositiveBigIntegerField(default=100), | ||
), | ||
migrations.AddField( | ||
model_name="geoippolicy", | ||
name="impossible_tolerance_km", | ||
field=models.PositiveIntegerField(default=100), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,15 +4,21 @@ | |||||
|
||||||
from django.contrib.postgres.fields import ArrayField | ||||||
from django.db import models | ||||||
from django.utils.timezone import now | ||||||
from django.utils.translation import gettext as _ | ||||||
from django_countries.fields import CountryField | ||||||
from geopy import distance | ||||||
from rest_framework.serializers import BaseSerializer | ||||||
|
||||||
from authentik.events.context_processors.geoip import GeoIPDict | ||||||
from authentik.events.models import Event, EventAction | ||||||
from authentik.policies.exceptions import PolicyException | ||||||
from authentik.policies.geoip.exceptions import GeoIPNotFoundException | ||||||
from authentik.policies.models import Policy | ||||||
from authentik.policies.types import PolicyRequest, PolicyResult | ||||||
|
||||||
MAX_DISTANCE_HOUR_KM = 1000 | ||||||
|
||||||
|
||||||
class GeoIPPolicy(Policy): | ||||||
"""Ensure the user satisfies requirements of geography or network topology, based on IP | ||||||
|
@@ -21,6 +27,15 @@ | |||||
asns = ArrayField(models.IntegerField(), blank=True, default=list) | ||||||
countries = CountryField(multiple=True, blank=True) | ||||||
|
||||||
distance_tolerance_km = models.PositiveIntegerField(default=50) | ||||||
|
||||||
check_history_distance = models.BooleanField(default=False) | ||||||
history_max_distance_km = models.PositiveBigIntegerField(default=100) | ||||||
history_login_count = models.PositiveIntegerField(default=5) | ||||||
|
||||||
check_impossible_travel = models.BooleanField(default=False) | ||||||
impossible_tolerance_km = models.PositiveIntegerField(default=100) | ||||||
|
||||||
@property | ||||||
def serializer(self) -> type[BaseSerializer]: | ||||||
from authentik.policies.geoip.api import GeoIPPolicySerializer | ||||||
|
@@ -37,21 +52,27 @@ | |||||
- the client IP is advertised by an autonomous system with ASN in the `asns` | ||||||
- the client IP is geolocated in a country of `countries` | ||||||
""" | ||||||
results: list[PolicyResult] = [] | ||||||
static_results: list[PolicyResult] = [] | ||||||
dynamic_results: list[PolicyResult] = [] | ||||||
|
||||||
if self.asns: | ||||||
results.append(self.passes_asn(request)) | ||||||
static_results.append(self.passes_asn(request)) | ||||||
if self.countries: | ||||||
results.append(self.passes_country(request)) | ||||||
static_results.append(self.passes_country(request)) | ||||||
|
||||||
if not results: | ||||||
if self.check_history_distance or self.check_impossible_travel: | ||||||
dynamic_results.append(self.passes_distance(request)) | ||||||
|
||||||
if not static_results and not dynamic_results: | ||||||
return PolicyResult(True) | ||||||
|
||||||
passing = any(r.passing for r in results) | ||||||
messages = chain(*[r.messages for r in results]) | ||||||
passing = any(r.passing for r in static_results) and all(r.passing for r in dynamic_results) | ||||||
messages = chain( | ||||||
*[r.messages for r in static_results], *[r.messages for r in dynamic_results] | ||||||
) | ||||||
|
||||||
result = PolicyResult(passing, *messages) | ||||||
result.source_results = results | ||||||
result.source_results = list(chain(static_results, dynamic_results)) | ||||||
|
||||||
return result | ||||||
|
||||||
|
@@ -73,7 +94,7 @@ | |||||
|
||||||
def passes_country(self, request: PolicyRequest) -> PolicyResult: | ||||||
# This is not a single get chain because `request.context` can contain `{ "geoip": None }`. | ||||||
geoip_data = request.context.get("geoip") | ||||||
geoip_data: GeoIPDict | None = request.context.get("geoip") | ||||||
country = geoip_data.get("country") if geoip_data else None | ||||||
|
||||||
if not country: | ||||||
|
@@ -87,6 +108,42 @@ | |||||
|
||||||
return PolicyResult(True) | ||||||
|
||||||
def passes_distance(self, request: PolicyRequest) -> PolicyResult: | ||||||
"""Check if current policy execution is out of distance range compared | ||||||
to previous authentication requests""" | ||||||
# Get previous login event and GeoIP data | ||||||
previous_logins = Event.objects.filter( | ||||||
action=EventAction.LOGIN, user__pk=request.user.pk, context__geo__isnull=False | ||||||
).order_by("-created")[: self.history_login_count] | ||||||
_now = now() | ||||||
geoip_data: GeoIPDict | None = request.context.get("geoip") | ||||||
if not geoip_data: | ||||||
return PolicyResult(False) | ||||||
for previous_login in previous_logins: | ||||||
previous_login_geoip: GeoIPDict = previous_login.context.get("geo") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We've checked previously. I'd rather it fails on that line if something went wrong, than a |
||||||
|
||||||
# Figure out distance | ||||||
dist = distance.geodesic( | ||||||
(previous_login_geoip["lat"], previous_login_geoip["long"]), | ||||||
(geoip_data["lat"], geoip_data["long"]), | ||||||
) | ||||||
if self.check_history_distance and dist.km >= ( | ||||||
self.history_max_distance_km - self.distance_tolerance_km | ||||||
): | ||||||
return PolicyResult( | ||||||
False, _("Distance from previous authentication is larger than threshold.") | ||||||
) | ||||||
# Check if distance between `previous_login` and now is more | ||||||
# than max distance per hour times the amount of hours since the previous login | ||||||
# (round down to the lowest closest time of hours) | ||||||
# clamped to be at least 1 hour | ||||||
rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 86400), 1) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
an hour is 60 secs * 60 mins = 3600 seconds, right? |
||||||
if self.check_impossible_travel and dist.km >= ( | ||||||
(MAX_DISTANCE_HOUR_KM * rel_time_hours) - self.distance_tolerance_km | ||||||
): | ||||||
return PolicyResult(False, _("Distance is further than possible.")) | ||||||
return PolicyResult(True) | ||||||
|
||||||
class Meta(Policy.PolicyMeta): | ||||||
verbose_name = _("GeoIP Policy") | ||||||
verbose_name_plural = _("GeoIP Policies") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any advantage in having two separate lists here?