Adding alert on file system modification events
Let’s setup alert that will send email to administrator when some executable script will be modified on Web server under user’s file system space. We will run scheduled search every 5 minutes to scan last 5 minutes worth of modifications.
- On a Splunk indexer, login into Splunk.
- Go to: Settings -> Searches, Reports and Alerts. Press [New].
- Destination app: “Search & Reporting (search)”.
- Search Name: “Scripts Added/Modified”.
- Search, paste this:
index=fs _index_earliest=-5m@m | eval indextime=strftime(_indextime, "%Y-%m-%d %H:%M:%S") | dedup source | table _time, indextime, source
- Schedule type: “Cron”
- Cron Schedule: */5 * * * *
- Run as (*) Owner
- Alert, Condition: “if number of events”, “is greater than”, 0
- Alert mode: “Once per search”
- Expiration: “After 24 hours”
- Send email: Enable
- To: – specify your email address. Also – make sure to configure email settings here (set Splunk server name or IP to yours):
http://12.34.5.67:8000/en-US/manager/search/admin/alert_actions/email?action=edit- List in triggered alerts: [x] Enable
- [Save]
This will make Splunk to run query to check for fresh scripts under user’s filespaces every 5 minutes. If changes are detected – administrator will receive email.
Please note that this is more like a demo alert that has nothing to do with malware but shows how to take advantage of established pipeline between Web Server+Splunk forwarder and destination server – Splunk indexer/search head.
Within this search query you might want to add condition to exclude certain frequently modified scripts – depending on your hosting specifics.
It is also important to note that instead of earliest=-5m@m we use _index_earliest=-5m@m
This is required to detect renamed files with this query as well as to detect files that has date/time set in the past.
Because we are using files as source of “events” – and they do not have any “time” fields within them – Splunk will use date/time stamp of an actual file. So to detect these we use _index_earliest condition – which is the indexing time of event.Here comes the main juice:
Adding alert on suspicious pattern match within freshly added or modified files
The procedure to create an alert to detect malware infection is almost exactly the same as previous alert. We also make it run and send email alert every 5 minutes. The only differences here are:
- Search Name – enter here “Suspicious Pattern match detected”.
- The search query itself.
The task this alert will do will be substantially more complicated and hence the search itself is way more involved.
The query will:
- Search for any files that was added, renamed or modified within the last 5 minutes.
- Search for typical patterns of malware within these files and assign malwarescore to each occurence of found.
Only files with malwarescore above zero will be passed to further analysis. - Assemble malwaremsg – text message explaining why risk score was assign to the given file.
- Access httpdconf index to pull information about user account that given file belongs to and assembles search regex search pattern into sitespec field. Sitespec will include all possible web site that user account has configured for in regex search patter format, like this:
“www.example.com|example.com|subdomain1.example.com” - Use sitespec pattern to find all sources of WEB traffic that possibly caused malware infection. The search will tap into apache_* indexes and will use previously defined time range: starttimesearch to endtimesearch. The web traffic search range is set to ‘indextime’-20 seconds till ‘indextime’+5 seconds. Which means: find me all web traffic to this client’s account that happened within 20 second of file date/time (or more precisely of file indextime).
- Display the results.
To summarize all above – we are scanning all recent file system modifications for malware patterns and for each match we show filename and IP addresses that might caused these modifications.
Email alert is generated when 1 or more results are found.The information presented in a single table will be enough to immediately delete or disable malware.
Further, simple search on found IP addresses will allow to trace back attacker’s steps and see exactly what he was doing before penetration occurred.
This will allow administrator to:
- Ban attacker’s IP address or CIDR range to suspend further malicious traffic.
- Quickly discover the ways successful attack was deployed with the simple query (substitute attacker’s IP):
index=apache_* clientip=x.xx.xxx.xxx | dedup _time, clientip, site, method, page | table _time, clientip, method, status, site, page, useragent - Close discovered vulnerabilities.
- Delete or disable detected malware.
- It is also recommended to scan results of “Scripts Added/Modified” alert to see if there are any modifications that did not trigger malware detection logic.
The approach and query presented will allow close to real time alerting on malware penetration including previously unknown malware.
Here is the final query:
index=fs _index_earliest=-5m@m source!=*some-file.php | regex source!="/some-folder/" | eval malwarescore = 0 | eval malwaremsg = "" | eval separator = "|" | eval addscore = if (match(_raw, "[^\r\n\s]{200,}") AND NOT match(source, "\.js$"), 10, 0) | eval addmsg = "[+"+addscore+"] "+"Very long line(s)" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "[a-zA-Z0-9\+/]{75,}") AND NOT match(source, "\.js$"), 5, 0) | eval addmsg = "[+"+addscore+"] "+"Tight line(s)" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "_FILES"), 10, 0) | eval addmsg = "[+"+addscore+"] "+"_FILES" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "(?i)(\\\\[a-z0-9_]{3,}){5,}"), 10, 0) | eval addmsg = "[+"+addscore+"] "+"\\NNN encoded entities" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "(?i)(\$[a-z0-9_]+\[\d+\]\s*\.\s*){5,}"), 10, 0) | eval addmsg = "[+"+addscore+"] "+"\\$XXX[NN] encoded entities" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "eval\s*\("), 15, 0) | eval addmsg = "[+"+addscore+"] "+"eval" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "gzinflate\s*\("), 10, 0) | eval addmsg = "[+"+addscore+"] "+"gzinflate" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "base64_decode\s*\("), 12, 0) | eval addmsg = "[+"+addscore+"] "+"base64_decode" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "symlink\s*\("), 10, 0) | eval addmsg = "[+"+addscore+"] "+"symlink" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "46esab"), 16, 0) | eval addmsg = "[+"+addscore+"] "+"46esab" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eval addscore = if (match(_raw, "edoced"), 16, 0) | eval addmsg = "[+"+addscore+"] "+"edoced" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") | eventstats min(_indextime) as indextime by source | eval starttimesearch=indextime-25 | eval endtimesearch=indextime+5 | dedup malwaremsg, source | eventstats sum(malwarescore) as malwarescore by source | where malwarescore>0 | makemv delim="|" malwaremsg | eventstats values(malwaremsg) AS malwaremsg by source | eval malwaremsg=mvjoin(malwaremsg, "|") | dedup source | rex field=source "/home/(?<account>[^/]+)" | sort -malwarescore, _indextime | eval indextime=strftime(indextime, "%Y-%m-%d %H:%M:%S") | join account [search index=httpdconf | eval x="Pull sitespec to search for relevant traffic" | rex "ServerName\s+(?<server_name>.*)" | rex "ServerAlias\s+(?<server_alias>.*)" | makemv delim=" " server_alias | rex "DocumentRoot\s+(?<document_root>[^\s]+)" | rex field=document_root "/home/(?<account>[^/]+)" | eventstats values(server_alias) as aliases, values(server_name) as sites by account | eval sites=mvappend(aliases, sites) | eval sites=mvdedup(sites) | eval sitespec="(" + mvjoin(sites, "|") + ")" | dedup account | fields account, sitespec ] | append [ | stats c | eval x="This is just a filler to prevent map from failing when there are no events" | eval starttimesearch=0 | eval endtimesearch=0 | eval indextime=0 | eval malwaremsg="" | eval malwarescore="" | eval account="" | eval source="" | fields starttimesearch, endtimesearch, indextime, malwaremsg, malwarescore, account, source ] | map search=" | search index=apache_* sourcetype=access_combined* starttimeu::$starttimesearch$ endtimeu::$endtimesearch$ | where match(site, $sitespec$) | eval account=\"$account$\" | eval indextime=$indextime$ | eval malwaremsg=\"$malwaremsg$\" | eval malwarescore=$malwarescore$ | eval source=\"$source$\" | regex uri_path=\"(/[^\.]+|\.php)$\" | iplocation allfields=true clientip | eval Country=if(Country=\"United States\", \"USA\", Country) | eval Region=if(len(Region)>0, Region, \"\") | eval City=if(len(City)>0, City, \"\") | eval clientipex=clientip+\" (\"+Country+\"-\"+Region+\"-\"+City+\")[\" + useragent + \"]\" | eval uriex = \"[\" + strftime(_time, \"%Y-%m-%d %H:%M:%S\") + \"] [\" + clientip + \"][\" + method + \"][\" + status + \"]:\" + uri | eventstats values(clientipex) as IPs, values(uriex) as URIs by source | eval URIs=mvsort(URIs) | append [ | stats c | eval x=\"Append original record in case no traffic will be found. if not traffic - this will still appear. If yes - it will be dedup-ed out\" | eval indextime=$indextime$ | eval IPs=\"No traffic detected\" | eval URIs=\"-----\" | eval account=\"$account$\" | eval malwaremsg=\"$malwaremsg$\" | eval malwarescore=$malwarescore$ | eval source=\"$source$\" | fields indextime, IPs, URIs, account, malwaremsg, malwarescore, source ] | dedup source | makemv delim=\"|\" malwaremsg " maxsearches=100 | where len(source)>0 | table indextime, IPs, URIs, account, malwaremsg, malwarescore, source
Here are step by step explanations:
index=fs _index_earliest=-5m@m source!=*some-file.php | regex source!="/some-folder/" | eval malwarescore = 0 | eval malwaremsg = "" | eval separator = "|" | eval addscore = if (match(_raw, "[^\r\n\s]{200,}") AND NOT match(source, "\.js$"), 10, 0) | eval addmsg = "[+"+addscore+"] "+"Very long line(s)" | eval malwarescore = malwarescore + addscore | eval malwaremsg = malwaremsg + if (addscore>0, separator+addmsg, "") ...
This fragment scans for any files that were modified within the last 5 minutes, excluding some files that are caused of frequent false positives. Keep this whitelist private!
Malwarescore is calculated to every detected fragment and malwaremsg initialized.
| eventstats min(_indextime) as indextime by source | eval starttimesearch=indextime-25 | eval endtimesearch=indextime+5 | dedup malwaremsg, source | eventstats sum(malwarescore) as malwarescore by source | where malwarescore>0 | makemv delim="|" malwaremsg | eventstats values(malwaremsg) AS malwaremsg by source | eval malwaremsg=mvjoin(malwaremsg, "|") | dedup source | rex field=source "/home/(?<account>[^/]+)" | sort -malwarescore, _indextime | eval indextime=strftime(indextime, "%Y-%m-%d %H:%M:%S")
Calculate timerange to search for within web traffic.
Sum mawlarescore by source (file-name). Filter out zero scored files.
Extract user account name. Eliminate duplicates for malwaremsg and make it “|”-separated string. Later on it will be reassembled as multivalued field for display purposes.
Assemble indextime as readable string – otherwise it won’t show up.
| join account [search index=httpdconf | eval x="Pull sitespec to search for relevant traffic" | rex "ServerName\s+(?<server_name>.*)" | rex "ServerAlias\s+(?<server_alias>.*)" | makemv delim=" " server_alias | rex "DocumentRoot\s+(?<document_root>[^\s]+)" | rex field=document_root "/home/(?<account>[^/]+)" | eventstats values(server_alias) as aliases, values(server_name) as sites by account | eval sites=mvappend(aliases, sites) | eval sites=mvdedup(sites) | eval sitespec="(" + mvjoin(sites, "|") + ")" | dedup account | fields account, sitespec ] | append [ | stats c | eval x="This is just a filler to prevent map from failing when there are no events" ... ]
This fragment takes account field and taps into httpdconf index to find names of all domains and subdomains that belongs to this account. Regex search pattern is assembled here: | eval sitespec=”(” + mvjoin(sites, “|”) + “)”. Later on this pattern will be used to tap into apache_* indexes to find relevant (only for this user account) traffic – attacking IP address that tried to deliver the malware.
| map search=" | search index=apache_* sourcetype=access_combined* starttimeu::$starttimesearch$ endtimeu::$endtimesearch$ | where match(site, $sitespec$) | eval account=\"$account$\" | eval indextime=$indextime$ | eval malwaremsg=\"$malwaremsg$\" | eval malwarescore=$malwarescore$ | eval source=\"$source$\" | regex uri_path=\"(/[^\.]+|\.php)$\" | iplocation allfields=true clientip | eval Country=if(Country=\"United States\", \"USA\", Country) | eval Region=if(len(Region)>0, Region, \"\") | eval City=if(len(City)>0, City, \"\") | eval clientipex=clientip+\" (\"+Country+\"-\"+Region+\"-\"+City+\")[\" + useragent + \"]\" | eval uriex = \"[\" + strftime(_time, \"%Y-%m-%d %H:%M:%S\") + \"] [\" + clientip + \"][\" + method + \"][\" + status + \"]:\" + uri | eventstats values(clientipex) as IPs, values(uriex) as URIs by source | eval URIs=mvsort(URIs) | append [ | stats c | eval x=\"Append original record in case no traffic will be found. if not traffic - this will still appear. If yes - it will be dedup-ed out\" | eval indextime=$indextime$ | eval IPs=\"No traffic detected\" | eval URIs=\"-----\" | eval account=\"$account$\" | eval malwaremsg=\"$malwaremsg$\" | eval malwarescore=$malwarescore$ | eval source=\"$source$\" | fields indextime, IPs, URIs, account, malwaremsg, malwarescore, source ] | dedup source | makemv delim=\"|\" malwaremsg " maxsearches=100 | where len(source)>0 | table indextime, IPs, URIs, account, malwaremsg, malwarescore, source
The map search pattern allows to execute custom search per each given event.
What we are accomplishing here is this:
for each event that generated non zero malwarescore – scan web traffic of the specific client account and find an attacking IP address(es). We just scan the last 20 seconds (defined by starttimesearch / endtimesearch range) of traffic before file’s indextime (the time when file modification was detected by Splunk).
After this final step – the query either returns zero results (no alert) or shows results in a table similar to this:
As we can see the query not only found piece of malware but detected the offending IP address and shows what kind of activity was performed by this IP address within timerange of an attack.Malware score and malware message showing the reason for trigger (in this case presence of suspicious base64_decode() call) is displayed as well.
Hope you’ll find these ideas and especially the ways Splunk can be utilized as a malware detection and investigation tool useful.
The malware score detection logic can be improved but as a general rule – I’d avoid making regex scan very precise, but rather steer toward making it more generic to avoid missing unknown malware occurences.
Connect with me on LinkedIn
Gleb Esman is currently working as Senior Product Manager for Security/Anti-fraud solutions at Splunk leading efforts to build next generation security products covering advanced fraud cases across multiple industry verticals.
Contact Gleb Esman.