Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

News and wat wet compatibilities #32

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
27 changes: 27 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/java
{
"name": "Java",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/java:1-11-bookworm",

"features": {
"ghcr.io/devcontainers/features/java:1": {
"version": "none",
"installMaven": "true",
"installGradle": "false"
}
}

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "java -version",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot

version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly
21 changes: 21 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Current File",
"request": "launch",
"mainClass": "${file}"
},
{
"type": "java",
"name": "Attach",
"request": "attach",
"hostName": "localhost",
"port": 8000
}
]
}
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "automatic"
}
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM maven:3.6.3-jdk-11 AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN mvn package

FROM spark:3.5.1
WORKDIR /app
COPY --from=build /app/target/*.jar ./target/
COPY --from=build /app/src/script/convert_url_index.sh ./src/script/convert_url_index.sh
VOLUME /app/data
ENV SPARK_ON_YARN="--master local"
ENTRYPOINT ["/app/src/script/convert_url_index.sh"]
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.version>11</java.version>

<spark.version>3.5.1</spark.version>
<spark.core.version>2.12</spark.core.version>
Expand Down
29 changes: 17 additions & 12 deletions src/main/java/org/commoncrawl/spark/CCIndex2Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
package org.commoncrawl.spark;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.commoncrawl.spark.util.CCIndex2FilenameParser;
import org.commoncrawl.spark.util.CCIndex2FilenameParser.FilenameParts;
import org.commoncrawl.spark.util.CCIndex2FilenameParser.FilenameParseError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Convert Common Crawl's URL index into a tabular format.
*/
Expand All @@ -39,9 +39,6 @@ public class CCIndex2Table extends IndexTable {

protected static boolean useBuiltinNestedSchema = false;

protected static final Pattern filenameAnalyzer = Pattern
.compile("^(?:common-crawl/)?crawl-data/([^/]+)/segments/([^/]+)/(crawldiagnostics|robotstxt|warc)/");

protected static class CdxLine extends IndexTable.CdxLine {
String redirect;
String digest;
Expand All @@ -68,12 +65,16 @@ public CdxLine(String line) throws IOException {
length = getInt("length");
status = getHttpStatus("status");

Matcher m = filenameAnalyzer.matcher(filename);
if (m.find()) {
crawl = m.group(1);
segment = m.group(2);
subset = m.group(3);
} else {
crawl = "unknown";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to support data sets which are not partitioned by crawl, subset and segments, this could be also done by providing a further schema and a adapted table converter, cf. the EOT schema and EOT converter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try removing this - it was done for testing, and may not be needed now that I mirror the structure of our crawl data during testing.

segment = "unknown";
subset = "unknown";

try{
final FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
crawl = parts.crawl;
segment = parts.segment;
subset = parts.subset;
} catch (FilenameParseError e) {
LOG.error("Filename not parseable: {}", filename);
}

Expand Down Expand Up @@ -109,6 +110,10 @@ public static Row convertCdxLine(String line) {
cdx.crawl, cdx.subset);
} else {
Row h = cdx.uri.getHostName().asRow();
if( h.get(0) == null ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the objective of first accepting CDX input records with an URI without a hostname ...

Copy link
Contributor Author

@jt55401 jt55401 Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jason will re-test this as well - we should probably remove this. (It's better to let the job fail - because we had bad data, etc)

LOG.error("Failed to parse hostname: " + cdx.uri.getHostName() + " from line:\n\t" + line);
return null;
}
return RowFactory.create(
// SURT and complete URL
cdx.urlkey,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.commoncrawl.spark.util;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class CCIndex2FilenameParser {
protected static final Pattern filenameAnalyzer = Pattern
.compile("^(?:common-crawl/)?crawl-data/([^/]+)/segments/([^/]+)/(crawldiagnostics|robotstxt|warc|wat|wet)/");

// crawl-data/CC-NEWS/2019/01/CC-NEWS-20190101042830-00057.warc.gz
protected static final Pattern newsFilenameAnalyzer = Pattern
.compile("^(?:common-crawl/)?crawl-data/CC-NEWS/(\\d+)/(\\d+)/CC-NEWS-(.+)\\.warc\\.gz");

// Class to encapsulate the extracted crawl, segment, and subset.
public static class FilenameParts {
public String crawl;
public String segment;
public String subset;
}

// Error class if we can't find the crawl, segment, and subset.
public static class FilenameParseError extends Exception {
public FilenameParseError(String message) {
super(message);
}
}

public static FilenameParts getParts(String filename) throws FilenameParseError {
FilenameParts parts = new FilenameParts();
Matcher m = filenameAnalyzer.matcher(filename);
if(m.find()){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit-picking: this project adds a space before opening parentheses or brackets. The default Eclipse code style is used. Shall we add a code formatter to the build?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default Eclipse code style is used

Ah - sure - do you know (or can you provide public URL of) what the default eclipse formatter settings are?
Is it the google one? or something different?
https://raw.githubusercontent.com/google/styleguide/gh-pages/eclipse-java-google-style.xml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, food for thought/broader discussion - can we use EditorConfig so any IDE can work well in every langauge?
https://editorconfig.org/ Probably not a topic to discuss here though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. I'll have a look. But, yes, let's discuss it separately.

parts.crawl = m.group(1);
parts.segment = m.group(2);
parts.subset = m.group(3);
} else {
Matcher newsParts = newsFilenameAnalyzer.matcher(filename);
if(!newsParts.find()){
throw new FilenameParseError("Filename not parseable (tried normal and news): " + filename);
}
parts.crawl = String.format("CC-NEWS-%s-%s", newsParts.group(1), newsParts.group(2));
parts.segment = newsParts.group(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is (now) no equivalent to the segment in the news crawl. I wouldn't use the WARC timestamp (time when the WARC was started) and its serial number as an equivalent: timestamps and serial number of the segments in the main crawl relate to time of fetch list generation. In the news crawl it's about the fetch time.

If segment is left away we would need to modify the schema for the news crawl because the "segment" is defined as not nullable. Maybe it's better to provide a different schema than trying to force every data set into the same scheme independent of its scope, collection method and tools. This was one insight of adapting the CDX-to-Parquet converter for the end-of-term archive. See eot-index-schema.json and EOTIndexTable.java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought much about this as well.

I did it this way for the following reasons:

  • the redaction tools (and IMHO, all our tools) are going to be simpler if we're dealing with same index format.
  • I think while news may have no natural "segment", I don't really see the harm with populating it with some potentially useful data. Perhaps I need to be aware of what the harm might be. (what do people even use that field for?)
  • I also considered a single value ie: "0000000", but, per previous point, I think the way I did it is potentially a little more useful.

Perhaps it boils down to these options:

  • Leave my solution (probably not)
  • Populate only with serial number (seems a bit better based on what I think you're saying above)
  • Populate with static value (I'm not a fan of useless data, but, I like it better than changing schema)
  • New index format (makes many of our tools & future processes more complex "forever")

I'm going to ask a question below about news as well - that may be a part of the answer. See below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the discussion below about the subset "news-warc".

(what do people even use that field for?)

Only very few users use it, for example Henry Thompson in "Improved methodology ...". In order to use it you need to know what it stands for: one part (one 100th) of the fetch list with all data of one segment fetched together in the same time frame (as of now: 3 hours). We might explain it to our users, for example in the schema.

parts.subset = "news-warc";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this shouldn't be just "warc"? The value in the "crawl" partition column already makes it clear that it's the news crawl and not the main crawl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I went back and forth on this a few times.
At the end of the day - it came down to a few comments Greg has made - in that we want to be extra careful about how users of the data may be sorting/filtering data, and not making changes that might upset their existing processes.

I'm not sure philosophically if we want to consider the news and main crawls "one thing" or if they should be separate. So, I chose a middle ground, where news is in the same index/schema, but it wouldn't likely get filtered into scope by anyone looking for "warc" subset. (that's also why i chose to prefix with news rather than suffix - in case people might be doing "startswith warc" or "warc%" or something strange.

I realize people COULD filter news in/out based on the crawl value - but, again, I erred on the side of making the default for existing users "out".

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to be extra careful about how users of the data may be sorting/filtering data, and not making changes that might upset their existing processes.

I fully agree. There shouldn't be any changes which cannot be handled by schema merging. That is, extending the table is possible, but not changing the semantics of existing columns.

My assumption was that any new table (for CC-NEWS, the old crawls ARC files, but also the WAT/WET files) would break the semantics of the schema. That's why the location is:

s3://commoncrawl/cc-index/table/cc-main/warc/crawl=.../subset=.../

Consequently, a table for CC-NEWS would be on a different path. It would be a separate table which might use a different schema. Given the continuous release of the news crawl the path pattern could be:

s3://commoncrawl/cc-index/table/cc-news/warc/year=.../month=.../

The news crawl has a different collection method: it's sampled by time, not on a global level, with no revisits.

In addition, the cc-main table is already quite big. Simply for practical reasons, I would keep the news crawl and especially the WAT/WET records in a separate table. It's also more practical to announce separate new tables to the users than explain the extension of the existing one by new partitions and how to rephrase queries so that the results do not change.

Over the long term, an independent schema also allows for schema upgrades which focus on the given dataset format or scope: text metrics for WET or news-related metadata (publication time, etc.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - I understand what you are saying.

Still not sure I agree though - isn't it better for most users to have a comprehensive index of everything in one place?

@wumpus - I guess you've mentioned a few times these new indexes need not even be public - so, perhaps this is all a moot point if they're for internal use only.

I'm of the mind that a single index for all data is most useful, and there's little harm (and likely benefit) to making them public as well, but, if that is considered ill advised, just let me know which account the cdx files should end up in - our private/commoncrawl one, or the public dataset one.

Furthermore - if anyone has preferences on the final bucket/location/path within S3, please do let me know.

I'm going to try to do a few small test runs of the cdx job early this week, with a goal of getting a larger job running later in the week.

}
return parts;
}
}
2 changes: 2 additions & 0 deletions src/script/convert_url_index.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.sql.hive.metastorePartitionPruning=true \
--conf spark.hadoop.parquet.enable.summary-metadata=false \
--conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS \
--conf "spark.driver.userClassPathFirst=true" \
--conf "spark.executor.userClassPathFirst=true" \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmhh. Might be safe because cc-index-table ships with very few dependencies given that Spark is "provided" and the AWS SDK shades all transitive dependencies. In general, I'd be very cautious with this configuration option still marked as "experimental": conflicts with transitive dependencies from the user jar and used/required by Spark may cause errors (typically related to serialization or inter-process communication) which are difficult to trace.

Copy link
Contributor Author

@jt55401 jt55401 May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not get the job to run to completion without these lines - the spark environment (as seen in the dockerfile) was overriding our libraries, and it was causing this error:

Caused by: java.lang.NoSuchMethodError: 'com.google.gson.JsonElement com.google.gson.JsonParser.parseReader(java.io.Reader)'

If you have documentation or hints you can give on how the real environment is setup, I'd appreciate a pointer, I'm happy to adjust.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, I've also run into the gson issue. The version used must be the same (or API-compatible) than that provided by Spark. As said, I'm mostly optimistic that there will be no regressions with the user class path first. If not (to be tested), we can roll this back later.

Alternatively, the script already allows to add additional Spark options via the environment variable SPARK_EXTRA_OPTS:

SPARK_EXTRA_OPTS="--conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true" ./src/script/convert_url_index.sh ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setup of Spark and the job configuration is in the crawl-tools repository. Ping me if more information is needed.

--class org.commoncrawl.spark.CCIndex2Table $_APPJAR \
--outputCompression=$COMPRS \
--outputFormat=$FORMAT $NESTED \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.commoncrawl.spark;

import static org.junit.jupiter.api.Assertions.assertEquals;
import org.commoncrawl.spark.util.CCIndex2FilenameParser;
import org.commoncrawl.spark.util.CCIndex2FilenameParser.FilenameParts;
import org.commoncrawl.spark.util.CCIndex2FilenameParser.FilenameParseError;
import org.junit.jupiter.api.Test;

public class TestCCIndex2FilenameParser {

@Test
public void testMainWarcFilename() throws FilenameParseError {
String filename = "crawl-data/CC-MAIN-2018-47/segments/1542039741324.15/warc/CC-MAIN-20181113153141-20181113174452-00011.warc.gz";
FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
assertEquals("CC-MAIN-2018-47", parts.crawl);
assertEquals("1542039741324.15", parts.segment);
assertEquals("warc", parts.subset);
}

@Test
public void testMainWat() throws FilenameParseError {
String filename = "crawl-data/CC-MAIN-2018-47/segments/1542039741324.15/wat/CC-MAIN-20181113153141-20181113174452-00011.warc.wat.gz";
FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
assertEquals("CC-MAIN-2018-47", parts.crawl);
assertEquals("1542039741324.15", parts.segment);
assertEquals("wat", parts.subset);
}

@Test
public void testMainWet() throws FilenameParseError {
String filename = "crawl-data/CC-MAIN-2018-47/segments/1542039741016.16/wet/CC-MAIN-20181112172845-20181112194415-00012.warc.wet.gz";
FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
assertEquals("CC-MAIN-2018-47", parts.crawl);
assertEquals("1542039741016.16", parts.segment);
assertEquals("wet", parts.subset);
}

@Test
public void testMainCrawldiagnostics() throws FilenameParseError {
String filename = "crawl-data/CC-MAIN-2018-47/segments/1542039741016.16/crawldiagnostics/CC-MAIN-20181112172845-20181112194415-00012.warc.gz";
FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
assertEquals("CC-MAIN-2018-47", parts.crawl);
assertEquals("1542039741016.16", parts.segment);
assertEquals("crawldiagnostics", parts.subset);
}

@Test
public void testNewsWarcFilename() throws FilenameParseError {
String filename = "crawl-data/CC-NEWS/2019/01/CC-NEWS-20190101042830-00057.warc.gz";
FilenameParts parts = CCIndex2FilenameParser.getParts(filename);
assertEquals("CC-NEWS-2019-01", parts.crawl);
assertEquals("20190101042830-00057", parts.segment);
assertEquals("news-warc", parts.subset);
}

}