One of the issues I’ve come across with AWS WAF is that it can be incredibly difficult, tedious and time-consuming to tailor rules that block all requests that should be blocked without occasionally, inadvertently blocking perfectly valid requests to the platform.
This can cause a lot of headache, so I came up with a custom solution (written in Python, provisioned as a Lambda function) that will do the following:
- Grabs all blocked requests within the past 2 hours
- Checks the request endpoint against a list of valid platform endpoints
- If there is a match, alert to Slack for visibility
Okay, it’s not a perfect solution. There’s still a requirement to go and manually check blocked requests that alert in Slack. But it’s a good start to getting those rules refined, and gives a lot more visibility and peace of mind in the event of valid requests being blocked.
The solution sounds simple enough, but there’s some trickery around comparing the request URI’s from WAF against my own list of valid platform endpoints. You’ll see what I mean soon! So, let’s break down the functionality of the script, step by step.
First, we need a function that grabs the Web ACL. This is the easy bit. We’ll reference this function in the next step.
def get_web_acl(waf_name, waf_id): return waf_client.get_web_acl(Name={waf_name}, Scope='REGIONAL', Id={waf_id})
Now, we need to grab all of the blocked request URI’s. I’ll have this as a function that returns a list of all our blocked requests. There’s a bit going on here, so I’ll break each part down and explain what’s happening.
def get_blocked_uris(waf_arn, waf_name, waf_id): current_datetime = datetime.now() all_rule_metrics = [] web_acls = get_web_acl(waf_name, waf_id)
As this is a Lambda, I pass the WAF arn, name and ID through as environment variables. I then have an empty list to store rule metrics, and we grab our Web ACL via the function in the previous step.
for rule_metric_name in web_acls['WebACL']['Rules']: if rule_metric_name['Statement']['ManagedRuleGroupStatement']['VendorName'] == 'AWS': all_rule_metrics.append(rule_metric_name['Name'])
Here, we’re looping through all of the WebACL rules and grab all of the AWS managed rules that are applied to it. This is because I happen to only have AWS managed rules in place, and I want to make sure I’m capturing blocked requests for all of those rule metrics.
all_request_uris = [] for metric in all_rule_metrics: all_blocked_requests = waf_client.get_sampled_requests( WebAclArn = waf_arn, RuleMetricName = f'{waf_name}-AWS-{metric}', Scope = 'REGIONAL', TimeWindow = { 'StartTime': current_datetime - timedelta(hours=2), 'EndTime': current_datetime } ) for request in all_blocked_requests['SampledRequests']: request_to_append = request["Request"]["URI"] if request_to_append not in all_request_uris: all_request_uris.append(request_to_append) return all_request_uris
Here, we first initialise an empty list. This will be where we store our final list of blocked requests. Then, we loop through our AWS managed rule metrics and store all blocked requests for that metric as all_blocked_requests
. We then loop through all of the requests in all_blocked_requests
and grab the request URI (we’re not interested in any other request data captured). If the request isn’t already in our all_request_uris
list, then we add it. Finally, we return the list.
It’s worth noting that the URI will be the path of the request. For example, if the request was sent to https://dynadev.co.uk/random/example/endpoint.php
then the URI we capture would be /random/example/endpoint.php
.
So, now we have our list of blocked requests from WAF, we need to compare them against our own list of valid endpoints. This is where the tricky part comes in, as the paths can have dynamic values.
For example, what if the path is /user/jack/profile
? In that case, jack
could be any username, so we need to account for this in our own list. Most backend application frameworks provide a way of listing these paths (or routes). In Symfony for example, there is the debug:router
command. The output of our example path above then, would be something like this: /user/{username}/profile
So how do we compare those two paths (one with a static value, the other with a dynamic variable)? Well, we can break the paths down into components, and simply ignore checking the dynamic components altogether. If you’re not following, let me explain better by jumping into the code.
Let’s create a function to handle the check and pass our blocked URIs to it:
def check_against_valid_uri_list(blocked_uris): with open('/tmp/endpoints.txt') as platform_paths: list_of_checked_valid_paths = [] for endpoint in platform_paths: for uri in blocked_uris: platform_path_split = endpoint.strip()[1:].split('/') blocked_uri_split = uri.strip()[1:].split('/') new_platform_path_split = [] new_blocked_uri_split = [] if (len(platform_path_split) == len(blocked_uri_split)): for i in range(len(platform_path_split)): if not re.match('{[^}]*}', platform_path_split[i]): new_platform_path_split.append(platform_path_split[i]) new_blocked_uri_split.append(blocked_uri_split[i]) else: continue if new_platform_path_split == new_blocked_uri_split and uri not in list_of_checked_valid_paths: list_of_checked_valid_paths.append(uri)
Firstly, we open our list of valid platform endpoints (/tmp/endpoints.txt
) – stored in /tmp/
because it’s running in a Lambda function. We then loop through each valid endpoint, and for each valid endpoint, we loop through our list of blocked requests to see if there is a match.
platform_path_split = endpoint.strip()[1:].split('/') blocked_uri_split = uri.strip()[1:].split('/') new_platform_path_split = [] new_blocked_uri_split = []
This section splits both our valid path and blocked path by the ‘/
‘ character. The result is stored as a list of strings that make up the path.
if (len(platform_path_split) == len(blocked_uri_split)): for i in range(len(platform_path_split)): if not re.match('{[^}]*}', platform_path_split[i]): new_platform_path_split.append(platform_path_split[i]) new_blocked_uri_split.append(blocked_uri_split[i]) else: continue
Here, we first check the length of both lists. If they’re not the same length then we know they aren’t a match so we can continue onto the next blocked URI.
If the lengths match, then we perform a loop through our valid endpoint list to check for any dynamic variables. Since these will always be enclosed in {}
brackets, we can do a simple regex match. We then add each component of the paths to two new lists, omitting any components that are flagged as dynamic.
For example, if:/user/{username}/profile
is being compared to /user/jack/profile
, both of these paths will be broken down into two lists:
/user/{username}/profile = [user, {username}, profile] /user/jack/profile = [user, jack, profile]
During the loop, once we hit {username}
, regex detects a match, so the item at this index on both lists is skipped when we compile our new lists. The new lists will then be [user, profile]
.
if new_platform_path_split == new_blocked_uri_split and uri not in list_of_checked_valid_paths: list_of_checked_valid_paths.append(uri)
We then do a final check against our two new lists, and we add our blocked request URI to our list_of_checked_valid_paths
(if it isn’t already there).
We’re then left with a list of all blocked request URI’s that match against valid platform paths which we can send via Webhook to Slack as an alert.
Leave a Reply