From c7aebf51eac6694bba820be49ebec3f5c262d441 Mon Sep 17 00:00:00 2001
From: "release-please[bot]"
<55107282+release-please[bot]@users.noreply.github.com>
Date: Wed, 24 Feb 2021 22:28:06 +0000
Subject: [PATCH 01/47] chore(master): release 0.1.1-SNAPSHOT (#91)
:robot: I have created a release \*beep\* \*boop\*
---
### Updating meta-information for bleeding-edge SNAPSHOT release.
---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
---
pom.xml | 2 +-
versions.txt | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/pom.xml b/pom.xml
index af8ff185..e399314a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
4.0.0
com.google.cloud
pubsublite-spark-sql-streaming
- 0.1.0
+ 0.1.1-SNAPSHOT
jar
Pub/Sub Lite Spark SQL Streaming
https://github.com/googleapis/java-pubsublite-spark
diff --git a/versions.txt b/versions.txt
index 86dfe658..b62cf13b 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,5 +1,5 @@
# Format:
# module:released-version:current-version
-pubsublite-spark-sql-streaming:0.1.0:0.1.0
+pubsublite-spark-sql-streaming:0.1.0:0.1.1-SNAPSHOT
com.google.cloud.samples.shared-configuration:1.0.21:1.0.21
From c44c05e21cbbcaf898ac5976de9f171b9faf900e Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Thu, 25 Feb 2021 01:47:30 +0100
Subject: [PATCH 02/47] chore(deps): update dependency
com.google.cloud:libraries-bom to v18 (#92)
---
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index de5d8a34..89d04ba3 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 17.0.0
+ 18.0.0
pom
import
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index eb5ab8b6..8f3e748f 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 17.0.0
+ 18.0.0
pom
import
From 8b8886add0e086c3c31390b7c2e5bc478f065390 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Wed, 24 Feb 2021 16:56:05 -0800
Subject: [PATCH 03/47] chore: regenerate README (#93)
This PR was generated using Autosynth. :rainbow:
Log from Synthtool
```
2021-02-25 00:50:33,400 synthtool [DEBUG] > Executing /root/.cache/synthtool/java-pubsublite-spark/.github/readme/synth.py.
On branch autosynth-readme
nothing to commit, working tree clean
2021-02-25 00:50:34,949 synthtool [DEBUG] > Wrote metadata to .github/readme/synth.metadata/synth.metadata.
```
Full log will be available here:
https://source.cloud.google.com/results/invocations/cb1a92e3-4929-40fe-b7d2-5404d7321c41/targets
- [ ] To automatically regenerate this PR, check this box.
---
.github/readme/synth.metadata/synth.metadata | 4 ++--
README.md | 6 +++---
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index 1014012f..58cce6f4 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "5726724b43c44c269478c37a389ef825da8083ac"
+ "sha": "c44c05e21cbbcaf898ac5976de9f171b9faf900e"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "79ab0b44a2cc7d803d07c107f9faf07729fc4012"
+ "sha": "0199c79b8324fba66476300824aa931788c47e2d"
}
}
]
diff --git a/README.md b/README.md
index aebb07ec..e1fdabb5 100644
--- a/README.md
+++ b/README.md
@@ -20,18 +20,18 @@ If you are using Maven, add this to your pom.xml file:
com.google.cloud
pubsublite-spark-sql-streaming
- 0.0.0
+ 0.1.0
```
If you are using Gradle without BOM, add this to your dependencies
```Groovy
-compile 'com.google.cloud:pubsublite-spark-sql-streaming:0.0.0'
+compile 'com.google.cloud:pubsublite-spark-sql-streaming:0.1.0'
```
If you are using SBT, add this to your dependencies
```Scala
-libraryDependencies += "com.google.cloud" % "pubsublite-spark-sql-streaming" % "0.0.0"
+libraryDependencies += "com.google.cloud" % "pubsublite-spark-sql-streaming" % "0.1.0"
```
## Authentication
From 508b90c3fd0a6045d548c1c6336fe4b542a27d07 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Thu, 25 Feb 2021 22:41:48 +0100
Subject: [PATCH 04/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite to v0.11.0 (#95)
---
pom.xml | 2 +-
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index e399314a..ffcaa4c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.10.0
+ 0.11.0
com.google.api.grpc
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 89d04ba3..3172c288 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.10.0
+ 0.11.0
junit
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 8f3e748f..97d5b00b 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.10.0
+ 0.11.0
junit
From d9b9289160d50ca4b44447287b887249190db9fd Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Thu, 25 Feb 2021 23:36:19 +0100
Subject: [PATCH 05/47] deps: update dependency
com.google.api.grpc:proto-google-cloud-pubsublite-v1 to v0.11.0 (#94)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index ffcaa4c6..dabf27ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
com.google.api.grpc
proto-google-cloud-pubsublite-v1
- 0.10.0
+ 0.11.0
com.google.guava
From 435624723d5ffdbfc803ac19e034d65cea33986e Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Thu, 25 Feb 2021 23:50:52 +0100
Subject: [PATCH 06/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite-parent to v0.11.0 (#96)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index dabf27ee..4200f709 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 0.10.0
+ 0.11.0
4.0.0
com.google.cloud
From 9187ff382714b810e94758f3ba1e89a75ae99caf Mon Sep 17 00:00:00 2001
From: Tianzi Cai
Date: Fri, 26 Feb 2021 10:37:50 -0800
Subject: [PATCH 07/47] docs: update client lib documentation link (#98)
---
.repo-metadata.json | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/.repo-metadata.json b/.repo-metadata.json
index 93ec87a2..4cdef959 100644
--- a/.repo-metadata.json
+++ b/.repo-metadata.json
@@ -3,7 +3,7 @@
"name_pretty": "Pub/Sub Lite Spark Connector",
"product_documentation": "https://cloud.google.com/pubsub/lite/docs",
"api_description": "Pub/Sub Lite is a zonal, real-time messaging service that lets you send and receive messages between independent applications. You can manually configure the throughput and storage capacity for Pub/Sub Lite systems.",
- "client_documentation": "https://googleapis.dev/java/google-cloud-pubsublite/latest/index.html",
+ "client_documentation": "https://googleapis.dev/java/pubsublite-spark-sql-streaming/latest/index.html",
"release_level": "alpha",
"transport": "grpc",
"requires_billing": true,
@@ -14,4 +14,4 @@
"distribution_name": "com.google.cloud:pubsublite-spark-sql-streaming",
"codeowner_team": "@googleapis/api-pubsub",
"api_id": "pubsublite.googleapis.com"
-}
\ No newline at end of file
+}
From 91bdd037a9bc20cfff094fec0f31a1461b6f4978 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Fri, 26 Feb 2021 10:46:04 -0800
Subject: [PATCH 08/47] chore: regenerate README (#99)
This PR was generated using Autosynth. :rainbow:
Log from Synthtool
```
2021-02-26 18:40:00,410 synthtool [DEBUG] > Executing /root/.cache/synthtool/java-pubsublite-spark/.github/readme/synth.py.
On branch autosynth-readme
nothing to commit, working tree clean
2021-02-26 18:40:02,026 synthtool [DEBUG] > Wrote metadata to .github/readme/synth.metadata/synth.metadata.
```
Full log will be available here:
https://source.cloud.google.com/results/invocations/40128cfb-4872-47ca-8564-2ceca3934c06/targets
- [ ] To automatically regenerate this PR, check this box.
---
.github/readme/synth.metadata/synth.metadata | 4 ++--
README.md | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index 58cce6f4..9791a1fa 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "c44c05e21cbbcaf898ac5976de9f171b9faf900e"
+ "sha": "9187ff382714b810e94758f3ba1e89a75ae99caf"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "0199c79b8324fba66476300824aa931788c47e2d"
+ "sha": "8c5628b86cfa8386de7b8fc1675e6b528b552d57"
}
}
]
diff --git a/README.md b/README.md
index e1fdabb5..2e549cbd 100644
--- a/README.md
+++ b/README.md
@@ -226,7 +226,7 @@ Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5]
Java is a registered trademark of Oracle and/or its affiliates.
[product-docs]: https://cloud.google.com/pubsub/lite/docs
-[javadocs]: https://googleapis.dev/java/google-cloud-pubsublite/latest/index.html
+[javadocs]: https://googleapis.dev/java/pubsublite-spark-sql-streaming/latest/index.html
[kokoro-badge-image-1]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java7.svg
[kokoro-badge-link-1]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java7.html
[kokoro-badge-image-2]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8.svg
From f82087b17a6c6e44af235a7e6e7a4632874aef42 Mon Sep 17 00:00:00 2001
From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com>
Date: Fri, 26 Feb 2021 17:50:00 -0500
Subject: [PATCH 09/47] docs: Add maven central link. (#100)
* update
* Update .readme-partials.yaml
Co-authored-by: Tianzi Cai
Co-authored-by: Tianzi Cai
---
.readme-partials.yaml | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/.readme-partials.yaml b/.readme-partials.yaml
index b627f59c..c6a90ce8 100644
--- a/.readme-partials.yaml
+++ b/.readme-partials.yaml
@@ -16,8 +16,7 @@ custom_content: |
## Downloading and Using the Connector
-
- The connector will be available from the Maven Central repository. It can be used using the `--packages` option or the `spark.jars.packages` configuration property.
+ The connector is available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--packages` option when using the `spark-submit` command or set it via the `spark.jars.packages` [configuration property](https://spark.apache.org/docs/latest/configuration.html#available-properties).
## Compatibility
| Connector version | Spark version |
From 0d8461567b04249f2a7dad94ebc1b67dbcd66039 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Fri, 26 Feb 2021 14:58:04 -0800
Subject: [PATCH 10/47] chore: regenerate README (#101)
This PR was generated using Autosynth. :rainbow:
Log from Synthtool
```
2021-02-26 22:52:17,406 synthtool [DEBUG] > Executing /root/.cache/synthtool/java-pubsublite-spark/.github/readme/synth.py.
On branch autosynth-readme
nothing to commit, working tree clean
2021-02-26 22:52:18,972 synthtool [DEBUG] > Wrote metadata to .github/readme/synth.metadata/synth.metadata.
```
Full log will be available here:
https://source.cloud.google.com/results/invocations/5b054a0f-ff73-4e8e-88e0-8df8c4f356d9/targets
- [ ] To automatically regenerate this PR, check this box.
---
.github/readme/synth.metadata/synth.metadata | 2 +-
README.md | 3 +--
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index 9791a1fa..b892c10e 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,7 +4,7 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "9187ff382714b810e94758f3ba1e89a75ae99caf"
+ "sha": "f82087b17a6c6e44af235a7e6e7a4632874aef42"
}
},
{
diff --git a/README.md b/README.md
index 2e549cbd..02cdf64a 100644
--- a/README.md
+++ b/README.md
@@ -85,8 +85,7 @@ and manual Spark installations.
## Downloading and Using the Connector
-
- The connector will be available from the Maven Central repository. It can be used using the `--packages` option or the `spark.jars.packages` configuration property.
+ The connector is available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--packages` option when using the `spark-submit` command or set it via the `spark.jars.packages` [configuration property](https://spark.apache.org/docs/latest/configuration.html#available-properties).
## Compatibility
| Connector version | Spark version |
From 86c8b7e9295e5abbb4f491ef1a5295a1ac9b498c Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Tue, 2 Mar 2021 02:04:48 +0100
Subject: [PATCH 11/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite to v0.11.1 (#103)
---
pom.xml | 2 +-
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index 4200f709..7852a9d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.0
+ 0.11.1
com.google.api.grpc
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 3172c288..bf200a62 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.0
+ 0.11.1
junit
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 97d5b00b..d2a93c3d 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.0
+ 0.11.1
junit
From 4812cbc6710f2a894045b50b8f5f1245e3b80196 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Tue, 2 Mar 2021 02:05:07 +0100
Subject: [PATCH 12/47] deps: update dependency
com.google.api.grpc:proto-google-cloud-pubsublite-v1 to v0.11.1 (#102)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 7852a9d8..2b22dd0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
com.google.api.grpc
proto-google-cloud-pubsublite-v1
- 0.11.0
+ 0.11.1
com.google.guava
From bb73ca2900d3cea9cd6d5c920c0f09871fff73aa Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Tue, 2 Mar 2021 02:17:02 +0100
Subject: [PATCH 13/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite-parent to v0.11.1 (#104)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 2b22dd0a..6cab0960 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 0.11.0
+ 0.11.1
4.0.0
com.google.cloud
From 4de89f33e7717d2394b5ba63f6086673316818c2 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Tue, 2 Mar 2021 16:03:59 -0800
Subject: [PATCH 14/47] chore: remove docLava v2 doc generation (#105)
* chore: remove docLava v2 doc generation
Not using this anymore and it is causing some issues in pubsublite
* chore: removing v2 doclava bucket config
Source-Author: Emily Ball
Source-Date: Tue Mar 2 10:13:11 2021 -0800
Source-Repo: googleapis/synthtool
Source-Sha: 21da7d9fa02f6916d9f87cf4072b3547b5c72eb5
Source-Link: https://github.com/googleapis/synthtool/commit/21da7d9fa02f6916d9f87cf4072b3547b5c72eb5
---
.kokoro/release/publish_javadoc.cfg | 8 +-------
.kokoro/release/publish_javadoc.sh | 19 -------------------
synth.metadata | 4 ++--
3 files changed, 3 insertions(+), 28 deletions(-)
diff --git a/.kokoro/release/publish_javadoc.cfg b/.kokoro/release/publish_javadoc.cfg
index c0ea28d2..7a9aba0e 100644
--- a/.kokoro/release/publish_javadoc.cfg
+++ b/.kokoro/release/publish_javadoc.cfg
@@ -7,12 +7,6 @@ env_vars: {
value: "docs-staging"
}
-# cloud-rad staging
-env_vars: {
- key: "STAGING_BUCKET_V2"
- value: "docs-staging-v2-staging"
-}
-
env_vars: {
key: "TRAMPOLINE_BUILD_FILE"
value: "github/java-pubsublite-spark/.kokoro/release/publish_javadoc.sh"
@@ -26,4 +20,4 @@ before_action {
keyname: "docuploader_service_account"
}
}
-}
\ No newline at end of file
+}
diff --git a/.kokoro/release/publish_javadoc.sh b/.kokoro/release/publish_javadoc.sh
index c0ecd34e..efc49913 100755
--- a/.kokoro/release/publish_javadoc.sh
+++ b/.kokoro/release/publish_javadoc.sh
@@ -56,22 +56,3 @@ python3 -m docuploader create-metadata \
python3 -m docuploader upload . \
--credentials ${CREDENTIALS} \
--staging-bucket ${STAGING_BUCKET}
-
-popd
-
-# V2 due to problems w/ the released javadoc plugin doclava, Java 8 is required. Beware of accidental updates.
-
-mvn clean site -B -q -Ddevsite.template="${KOKORO_GFILE_DIR}/java/"
-
-pushd target/devsite/reference
-
-# create metadata
-python3 -m docuploader create-metadata \
- --name ${NAME} \
- --version ${VERSION} \
- --language java
-
-# upload docs to staging bucket
-python3 -m docuploader upload . \
- --credentials ${CREDENTIALS} \
- --staging-bucket ${STAGING_BUCKET_V2}
diff --git a/synth.metadata b/synth.metadata
index 276f2637..e51d110b 100644
--- a/synth.metadata
+++ b/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "c4944f2f96d134ae116791b091d7be77e8393aae"
+ "sha": "bb73ca2900d3cea9cd6d5c920c0f09871fff73aa"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "6946fd71ae9215b0e7ae188f5057df765ee6d7d2"
+ "sha": "21da7d9fa02f6916d9f87cf4072b3547b5c72eb5"
}
}
],
From 6e5138546c9446db4d8133c91ec207da46702266 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Wed, 3 Mar 2021 20:36:52 +0100
Subject: [PATCH 15/47] chore(deps): update dependency
com.google.cloud:libraries-bom to v18.1.0 (#106)
[](https://renovatebot.com)
This PR contains the following updates:
| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:libraries-bom](https://togithub.com/GoogleCloudPlatform/cloud-opensource-java) | `18.0.0` -> `18.1.0` | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) |
---
### Renovate configuration
:date: **Schedule**: At any time (no schedule defined).
:vertical_traffic_light: **Automerge**: Disabled by config. Please merge this manually once you are satisfied.
:recycle: **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.
:no_bell: **Ignore**: Close this PR and you won't be reminded about this update again.
---
- [ ] If you want to rebase/retry this PR, check this box
---
This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-pubsublite-spark).
---
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index bf200a62..04c9212f 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 18.0.0
+ 18.1.0
pom
import
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index d2a93c3d..5dd560ac 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 18.0.0
+ 18.1.0
pom
import
From 759c0da48867ea2b3978c700859cc3c4b8043bf5 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Thu, 4 Mar 2021 20:26:47 +0100
Subject: [PATCH 16/47] chore(deps): update dependency
com.google.cloud:libraries-bom to v19 (#107)
---
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 04c9212f..7792e9b7 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 18.1.0
+ 19.0.0
pom
import
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 5dd560ac..6a71d668 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 18.1.0
+ 19.0.0
pom
import
From d75274e1e1d8debd2c72eda92b2bce7f687b8bc2 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Thu, 4 Mar 2021 15:54:09 -0800
Subject: [PATCH 17/47] chore: copy README to docfx-yml dir (#108)
This PR was generated using Autosynth. :rainbow:
Synth log will be available here:
https://source.cloud.google.com/results/invocations/ed7b8a82-0394-45bb-ab76-b43a96195edd/targets
- [ ] To automatically regenerate this PR, check this box.
Source-Link: https://github.com/googleapis/synthtool/commit/d0bdade9a962042dc0f770cf631086f3db59b5b0
---
.kokoro/release/publish_javadoc11.sh | 5 ++++-
synth.metadata | 4 ++--
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/.kokoro/release/publish_javadoc11.sh b/.kokoro/release/publish_javadoc11.sh
index cf735ff7..4101fd58 100755
--- a/.kokoro/release/publish_javadoc11.sh
+++ b/.kokoro/release/publish_javadoc11.sh
@@ -40,6 +40,9 @@ export VERSION=$(grep ${NAME}: versions.txt | cut -d: -f3)
# generate yml
mvn clean site -B -q -P docFX
+# copy README to docfx-yml dir and rename index.md
+cp README.md target/docfx-yml/index.md
+
pushd target/docfx-yml
# create metadata
@@ -52,4 +55,4 @@ python3 -m docuploader create-metadata \
python3 -m docuploader upload . \
--credentials ${CREDENTIALS} \
--staging-bucket ${STAGING_BUCKET_V2} \
- --destination-prefix docfx-
+ --destination-prefix docfx
diff --git a/synth.metadata b/synth.metadata
index e51d110b..93682520 100644
--- a/synth.metadata
+++ b/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "bb73ca2900d3cea9cd6d5c920c0f09871fff73aa"
+ "sha": "759c0da48867ea2b3978c700859cc3c4b8043bf5"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "21da7d9fa02f6916d9f87cf4072b3547b5c72eb5"
+ "sha": "d0bdade9a962042dc0f770cf631086f3db59b5b0"
}
}
],
From 1bf772a275e76d6b7229d628b72d5dce4f5c8bc5 Mon Sep 17 00:00:00 2001
From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com>
Date: Tue, 9 Mar 2021 20:12:02 -0500
Subject: [PATCH 18/47] docs: Update gcs public available link (#109)
---
.readme-partials.yaml | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/.readme-partials.yaml b/.readme-partials.yaml
index c6a90ce8..9c84e179 100644
--- a/.readme-partials.yaml
+++ b/.readme-partials.yaml
@@ -15,8 +15,11 @@ custom_content: |
```
## Downloading and Using the Connector
-
- The connector is available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--packages` option when using the `spark-submit` command or set it via the `spark.jars.packages` [configuration property](https://spark.apache.org/docs/latest/configuration.html#available-properties).
+ The latest version of the connector is publicly available in the following link:
+ | Connector version | Link |
+ | --- | --- |
+ | 0.1.0 | `gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar`([HTTP link](https://storage.googleapis.com/spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar)) |
+ The connector is also available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--jars` option when using the `spark-submit` command.
## Compatibility
| Connector version | Spark version |
From 011e9d567cd0109790f916f545c8ad50138ddbe5 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Tue, 9 Mar 2021 17:22:02 -0800
Subject: [PATCH 19/47] chore: regenerate README (#112)
This PR was generated using Autosynth. :rainbow:
Log from Synthtool
```
2021-03-10 01:16:35,757 synthtool [DEBUG] > Executing /root/.cache/synthtool/java-pubsublite-spark/.github/readme/synth.py.
On branch autosynth-readme
nothing to commit, working tree clean
2021-03-10 01:16:36,659 synthtool [DEBUG] > Wrote metadata to .github/readme/synth.metadata/synth.metadata.
```
Full log will be available here:
https://source.cloud.google.com/results/invocations/dd59780c-4231-48a1-83dd-be9f1c53d16e/targets
- [ ] To automatically regenerate this PR, check this box.
---
.github/readme/synth.metadata/synth.metadata | 4 ++--
README.md | 7 +++++--
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index b892c10e..6d0f9a81 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "f82087b17a6c6e44af235a7e6e7a4632874aef42"
+ "sha": "1bf772a275e76d6b7229d628b72d5dce4f5c8bc5"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "8c5628b86cfa8386de7b8fc1675e6b528b552d57"
+ "sha": "e5fa6d93e42918dd4a000a80b92be23f5f4c6ac7"
}
}
]
diff --git a/README.md b/README.md
index 02cdf64a..628f6445 100644
--- a/README.md
+++ b/README.md
@@ -84,8 +84,11 @@ and manual Spark installations.
```
## Downloading and Using the Connector
-
- The connector is available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--packages` option when using the `spark-submit` command or set it via the `spark.jars.packages` [configuration property](https://spark.apache.org/docs/latest/configuration.html#available-properties).
+ The latest version of the connector is publicly available in the following link:
+ | Connector version | Link |
+ | --- | --- |
+ | 0.1.0 | `gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar`([HTTP link](https://storage.googleapis.com/spark-lib/pubsublite/pubsublite-spark-sql-streaming-0.1.0-with-dependencies.jar)) |
+ The connector is also available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--jars` option when using the `spark-submit` command.
## Compatibility
| Connector version | Spark version |
From 84d851233db07c5a2af9bbad644b98ee36f7d298 Mon Sep 17 00:00:00 2001
From: Yoshi Automation Bot
Date: Wed, 10 Mar 2021 14:34:35 -0800
Subject: [PATCH 20/47] build(java): update autorelease title check in response
to the new multi release branch changes (#113)
Source-Author: Stephanie Wang
Source-Date: Wed Mar 10 14:40:03 2021 -0500
Source-Repo: googleapis/synthtool
Source-Sha: 0b064d767537e0675fc053e53fca473c5c701fb8
Source-Link: https://github.com/googleapis/synthtool/commit/0b064d767537e0675fc053e53fca473c5c701fb8
---
.github/workflows/auto-release.yaml | 4 ++--
synth.metadata | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/auto-release.yaml b/.github/workflows/auto-release.yaml
index 7c8816a7..9b4fd4d8 100644
--- a/.github/workflows/auto-release.yaml
+++ b/.github/workflows/auto-release.yaml
@@ -16,8 +16,8 @@ jobs:
return;
}
- // only approve PRs like "chore(master): release "
- if ( !context.payload.pull_request.title.startsWith("chore(master): release") ) {
+ // only approve PRs like "chore: release "
+ if ( !context.payload.pull_request.title.startsWith("chore: release") ) {
return;
}
diff --git a/synth.metadata b/synth.metadata
index 93682520..7a54af80 100644
--- a/synth.metadata
+++ b/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-pubsublite-spark.git",
- "sha": "759c0da48867ea2b3978c700859cc3c4b8043bf5"
+ "sha": "011e9d567cd0109790f916f545c8ad50138ddbe5"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "d0bdade9a962042dc0f770cf631086f3db59b5b0"
+ "sha": "0b064d767537e0675fc053e53fca473c5c701fb8"
}
}
],
From b1a9dcd030d55bf2d4d32e4818aa2ac64b5d8e46 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Wed, 17 Mar 2021 20:19:05 +0100
Subject: [PATCH 21/47] chore(deps): update dependency
com.google.cloud:libraries-bom to v19.1.0 (#116)
---
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 7792e9b7..93297c63 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 19.0.0
+ 19.1.0
pom
import
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 6a71d668..3ca85f45 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 19.0.0
+ 19.1.0
pom
import
From ea3596fad872e26b5b0157192762d037d163577f Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Fri, 19 Mar 2021 22:30:14 +0100
Subject: [PATCH 22/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite-parent to v0.12.0 (#120)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 6cab0960..b5a238dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.google.cloud
google-cloud-pubsublite-parent
- 0.11.1
+ 0.12.0
4.0.0
com.google.cloud
From 821449eca7259bc36c5ae8f0d12a3c54af1484e7 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Fri, 19 Mar 2021 22:30:32 +0100
Subject: [PATCH 23/47] deps: update dependency
com.google.cloud:google-cloud-pubsublite to v0.12.0 (#119)
---
pom.xml | 2 +-
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index b5a238dc..ff688c69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.1
+ 0.12.0
com.google.api.grpc
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 93297c63..aeab8dfa 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.1
+ 0.12.0
junit
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 3ca85f45..fbae9274 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-pubsublite
- 0.11.1
+ 0.12.0
junit
From 880da1bf953526cd40e4b736a898185751c7bb27 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Fri, 19 Mar 2021 22:30:49 +0100
Subject: [PATCH 24/47] deps: update dependency
com.google.api.grpc:proto-google-cloud-pubsublite-v1 to v0.12.0 (#118)
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index ff688c69..c377b164 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
com.google.api.grpc
proto-google-cloud-pubsublite-v1
- 0.11.1
+ 0.12.0
com.google.guava
From b7237f88d04b474c44b33d329234e6ce016ec134 Mon Sep 17 00:00:00 2001
From: WhiteSource Renovate
Date: Fri, 19 Mar 2021 22:31:07 +0100
Subject: [PATCH 25/47] chore(deps): update dependency
com.google.cloud:libraries-bom to v19.2.1 (#117)
---
samples/snapshot/pom.xml | 2 +-
samples/snippets/pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index aeab8dfa..f08c3d43 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 19.1.0
+ 19.2.1
pom
import
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index fbae9274..b301d2d0 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
libraries-bom
- 19.1.0
+ 19.2.1
pom
import
From 20f336639c261ddb3b61d0bd14f02e6ea5146377 Mon Sep 17 00:00:00 2001
From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com>
Date: Fri, 19 Mar 2021 17:40:46 -0400
Subject: [PATCH 26/47] feat: Supports topic partition increase. (#115)
---
clirr-ignored-differences.xml | 15 ++
.../spark/CachedPartitionCountReader.java | 47 ++++++
.../spark/LimitingHeadOffsetReader.java | 20 ++-
.../spark/MultiPartitionCommitterImpl.java | 73 ++++++++-
.../spark/PartitionCountReader.java | 26 ++++
.../pubsublite/spark/PslContinuousReader.java | 13 +-
.../cloud/pubsublite/spark/PslDataSource.java | 24 +--
.../pubsublite/spark/PslMicroBatchReader.java | 58 +++----
.../spark/LimitingHeadOffsetReaderTest.java | 33 +++-
.../MultiPartitionCommitterImplTest.java | 141 ++++++++++++------
.../spark/PslContinuousReaderTest.java | 16 +-
.../spark/PslMicroBatchReaderTest.java | 99 +++++++++---
.../cloud/pubsublite/spark/TestingUtils.java | 43 ++++++
13 files changed, 488 insertions(+), 120 deletions(-)
create mode 100644 clirr-ignored-differences.xml
create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java
create mode 100644 src/main/java/com/google/cloud/pubsublite/spark/PartitionCountReader.java
create mode 100644 src/test/java/com/google/cloud/pubsublite/spark/TestingUtils.java
diff --git a/clirr-ignored-differences.xml b/clirr-ignored-differences.xml
new file mode 100644
index 00000000..1aa41e4f
--- /dev/null
+++ b/clirr-ignored-differences.xml
@@ -0,0 +1,15 @@
+
+
+
+
+ 7004
+ com/google/cloud/pubsublite/spark/*Reader
+ *
+
+
+ 7005
+ com/google/cloud/pubsublite/spark/*Reader
+ *
+ *
+
+
\ No newline at end of file
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java b/src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java
new file mode 100644
index 00000000..35555805
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/spark/CachedPartitionCountReader.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.spark;
+
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+
+@ThreadSafe
+public class CachedPartitionCountReader implements PartitionCountReader {
+ private final AdminClient adminClient;
+ private final Supplier supplier;
+
+ public CachedPartitionCountReader(AdminClient adminClient, TopicPath topicPath) {
+ this.adminClient = adminClient;
+ this.supplier =
+ Suppliers.memoizeWithExpiration(
+ () -> PartitionLookupUtils.numPartitions(topicPath, adminClient), 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void close() {
+ adminClient.close();
+ }
+
+ public int getPartitionCount() {
+ return supplier.get();
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java b/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
index 5954492f..7bad0ffc 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/LimitingHeadOffsetReader.java
@@ -27,7 +27,9 @@
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -40,18 +42,22 @@
* offsets for the topic at most once per minute.
*/
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {
+ private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
private final TopicStatsClient topicStatsClient;
private final TopicPath topic;
- private final long topicPartitionCount;
+ private final PartitionCountReader partitionCountReader;
private final AsyncLoadingCache cachedHeadOffsets;
@VisibleForTesting
public LimitingHeadOffsetReader(
- TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) {
+ TopicStatsClient topicStatsClient,
+ TopicPath topic,
+ PartitionCountReader partitionCountReader,
+ Ticker ticker) {
this.topicStatsClient = topicStatsClient;
this.topic = topic;
- this.topicPartitionCount = topicPartitionCount;
+ this.partitionCountReader = partitionCountReader;
this.cachedHeadOffsets =
Caffeine.newBuilder()
.ticker(ticker)
@@ -82,7 +88,7 @@ public void onSuccess(Cursor c) {
@Override
public PslSourceOffset getHeadOffset() {
Set keySet = new HashSet<>();
- for (int i = 0; i < topicPartitionCount; i++) {
+ for (int i = 0; i < partitionCountReader.getPartitionCount(); i++) {
keySet.add(Partition.of(i));
}
CompletableFuture
+
+ org.scala-lang.modules
+ scala-java8-compat_2.11
+ 0.9.1
+
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
index cac4337a..9ad29b23 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/Constants.java
@@ -17,7 +17,12 @@
package com.google.cloud.pubsublite.spark;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -26,22 +31,33 @@ public class Constants {
public static long DEFAULT_BYTES_OUTSTANDING = 50_000_000;
public static long DEFAULT_MESSAGES_OUTSTANDING = Long.MAX_VALUE;
public static long DEFAULT_MAX_MESSAGES_PER_BATCH = Long.MAX_VALUE;
+
+ public static ArrayType ATTRIBUTES_PER_KEY_DATATYPE =
+ DataTypes.createArrayType(DataTypes.BinaryType);
+ public static MapType ATTRIBUTES_DATATYPE =
+ DataTypes.createMapType(DataTypes.StringType, ATTRIBUTES_PER_KEY_DATATYPE);
+ public static Map PUBLISH_FIELD_TYPES =
+ ImmutableMap.of(
+ "key", DataTypes.BinaryType,
+ "data", DataTypes.BinaryType,
+ "attributes", ATTRIBUTES_DATATYPE,
+ "event_timestamp", DataTypes.TimestampType);
public static StructType DEFAULT_SCHEMA =
new StructType(
new StructField[] {
new StructField("subscription", DataTypes.StringType, false, Metadata.empty()),
new StructField("partition", DataTypes.LongType, false, Metadata.empty()),
new StructField("offset", DataTypes.LongType, false, Metadata.empty()),
- new StructField("key", DataTypes.BinaryType, false, Metadata.empty()),
- new StructField("data", DataTypes.BinaryType, false, Metadata.empty()),
+ new StructField("key", PUBLISH_FIELD_TYPES.get("key"), false, Metadata.empty()),
+ new StructField("data", PUBLISH_FIELD_TYPES.get("data"), false, Metadata.empty()),
new StructField("publish_timestamp", DataTypes.TimestampType, false, Metadata.empty()),
- new StructField("event_timestamp", DataTypes.TimestampType, true, Metadata.empty()),
new StructField(
- "attributes",
- DataTypes.createMapType(
- DataTypes.StringType, DataTypes.createArrayType(DataTypes.BinaryType)),
+ "event_timestamp",
+ PUBLISH_FIELD_TYPES.get("event_timestamp"),
true,
- Metadata.empty())
+ Metadata.empty()),
+ new StructField(
+ "attributes", PUBLISH_FIELD_TYPES.get("attributes"), true, Metadata.empty())
});
public static final PubsubContext.Framework FRAMEWORK = PubsubContext.Framework.of("SPARK");
@@ -52,6 +68,7 @@ public class Constants {
"pubsublite.flowcontrol.byteoutstandingperpartition";
public static String MESSAGES_OUTSTANDING_CONFIG_KEY =
"pubsublite.flowcontrol.messageoutstandingperparition";
+ public static String TOPIC_CONFIG_KEY = "pubsublite.topic";
public static String SUBSCRIPTION_CONFIG_KEY = "pubsublite.subscription";
public static String CREDENTIALS_KEY_CONFIG_KEY = "gcp.credentials.key";
}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
index 65953031..ad2ca3da 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java
@@ -22,6 +22,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
+import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
+import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
+import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
index 08a96ee8..2ef2535d 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java
@@ -23,6 +23,9 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.spark.internal.CachedPartitionCountReader;
+import com.google.cloud.pubsublite.spark.internal.LimitingHeadOffsetReader;
+import com.google.cloud.pubsublite.spark.internal.PartitionCountReader;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.sql.sources.DataSourceRegister;
@@ -30,13 +33,20 @@
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
@AutoService(DataSourceRegister.class)
public final class PslDataSource
- implements DataSourceV2, ContinuousReadSupport, MicroBatchReadSupport, DataSourceRegister {
+ implements DataSourceV2,
+ ContinuousReadSupport,
+ MicroBatchReadSupport,
+ StreamWriteSupport,
+ DataSourceRegister {
@Override
public String shortName() {
@@ -51,23 +61,24 @@ public ContinuousReader createContinuousReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}
- PslDataSourceOptions pslDataSourceOptions =
- PslDataSourceOptions.fromSparkDataSourceOptions(options);
- SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
+ PslReadDataSourceOptions pslReadDataSourceOptions =
+ PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
+ SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
- try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
+ try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
- new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
+ new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslContinuousReader(
- pslDataSourceOptions.newCursorClient(),
- pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
- pslDataSourceOptions.getSubscriberFactory(),
+ pslReadDataSourceOptions.newCursorClient(),
+ pslReadDataSourceOptions.newMultiPartitionCommitter(
+ partitionCountReader.getPartitionCount()),
+ pslReadDataSourceOptions.getSubscriberFactory(),
subscriptionPath,
- Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
+ Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
partitionCountReader);
}
@@ -79,28 +90,38 @@ public MicroBatchReader createMicroBatchReader(
"PubSub Lite uses fixed schema and custom schema is not allowed");
}
- PslDataSourceOptions pslDataSourceOptions =
- PslDataSourceOptions.fromSparkDataSourceOptions(options);
- SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
+ PslReadDataSourceOptions pslReadDataSourceOptions =
+ PslReadDataSourceOptions.fromSparkDataSourceOptions(options);
+ SubscriptionPath subscriptionPath = pslReadDataSourceOptions.subscriptionPath();
TopicPath topicPath;
- try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
+ try (AdminClient adminClient = pslReadDataSourceOptions.newAdminClient()) {
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
PartitionCountReader partitionCountReader =
- new CachedPartitionCountReader(pslDataSourceOptions.newAdminClient(), topicPath);
+ new CachedPartitionCountReader(pslReadDataSourceOptions.newAdminClient(), topicPath);
return new PslMicroBatchReader(
- pslDataSourceOptions.newCursorClient(),
- pslDataSourceOptions.newMultiPartitionCommitter(partitionCountReader.getPartitionCount()),
- pslDataSourceOptions.getSubscriberFactory(),
+ pslReadDataSourceOptions.newCursorClient(),
+ pslReadDataSourceOptions.newMultiPartitionCommitter(
+ partitionCountReader.getPartitionCount()),
+ pslReadDataSourceOptions.getSubscriberFactory(),
new LimitingHeadOffsetReader(
- pslDataSourceOptions.newTopicStatsClient(),
+ pslReadDataSourceOptions.newTopicStatsClient(),
topicPath,
partitionCountReader,
Ticker.systemTicker()),
subscriptionPath,
- Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
- pslDataSourceOptions.maxMessagesPerBatch());
+ Objects.requireNonNull(pslReadDataSourceOptions.flowControlSettings()),
+ pslReadDataSourceOptions.maxMessagesPerBatch());
+ }
+
+ @Override
+ public StreamWriter createStreamWriter(
+ String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
+ PslSparkUtils.verifyWriteInputSchema(schema);
+ PslWriteDataSourceOptions pslWriteDataSourceOptions =
+ PslWriteDataSourceOptions.fromSparkDataSourceOptions(options);
+ return new PslStreamWriter(schema, pslWriteDataSourceOptions);
}
}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java
new file mode 100644
index 00000000..631fb2d3
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.spark;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiService;
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
+import com.google.common.flogger.GoogleLogger;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+public class PslDataWriter implements DataWriter {
+
+ private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
+
+ private final long partitionId, taskId, epochId;
+ private final StructType inputSchema;
+ private final PublisherFactory publisherFactory;
+
+ @GuardedBy("this")
+ private Optional> publisher = Optional.empty();
+
+ @GuardedBy("this")
+ private final List> futures = new ArrayList<>();
+
+ public PslDataWriter(
+ long partitionId,
+ long taskId,
+ long epochId,
+ StructType schema,
+ PublisherFactory publisherFactory) {
+ this.partitionId = partitionId;
+ this.taskId = taskId;
+ this.epochId = epochId;
+ this.inputSchema = schema;
+ this.publisherFactory = publisherFactory;
+ }
+
+ @Override
+ public synchronized void write(InternalRow record) {
+ if (!publisher.isPresent() || publisher.get().state() != ApiService.State.RUNNING) {
+ publisher = Optional.of(publisherFactory.newPublisher());
+ }
+ futures.add(
+ publisher
+ .get()
+ .publish(Objects.requireNonNull(PslSparkUtils.toPubSubMessage(inputSchema, record))));
+ }
+
+ @Override
+ public synchronized WriterCommitMessage commit() throws IOException {
+ for (ApiFuture f : futures) {
+ try {
+ f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ publisher = Optional.empty();
+ throw new IOException(e);
+ }
+ }
+ log.atInfo().log(
+ "All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...",
+ partitionId, taskId, epochId);
+ return PslWriterCommitMessage.create(futures.size());
+ }
+
+ @Override
+ public synchronized void abort() {
+ log.atWarning().log(
+ "One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.",
+ partitionId, taskId, epochId);
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java
new file mode 100644
index 00000000..12d95921
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataWriterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.spark;
+
+import com.google.cloud.pubsublite.spark.internal.CachedPublishers;
+import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
+import java.io.Serializable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.types.StructType;
+
+public class PslDataWriterFactory implements Serializable, DataWriterFactory {
+ private static final long serialVersionUID = -6904546364310978844L;
+
+ private static final CachedPublishers CACHED_PUBLISHERS = new CachedPublishers();
+
+ private final StructType inputSchema;
+ private final PslWriteDataSourceOptions writeOptions;
+
+ public PslDataWriterFactory(StructType inputSchema, PslWriteDataSourceOptions writeOptions) {
+ this.inputSchema = inputSchema;
+ this.writeOptions = writeOptions;
+ }
+
+ @Override
+ public DataWriter createDataWriter(int partitionId, long taskId, long epochId) {
+ PublisherFactory pf = () -> CACHED_PUBLISHERS.getOrCreate(writeOptions);
+ return new PslDataWriter(partitionId, taskId, epochId, inputSchema, pf);
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
index b2a346c0..a0f0dfee 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java
@@ -24,6 +24,9 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
+import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
+import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
+import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
similarity index 92%
rename from src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java
rename to src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
index 380e022a..f5987788 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslReadDataSourceOptions.java
@@ -33,6 +33,10 @@
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
+import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
+import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitterImpl;
+import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
+import com.google.cloud.pubsublite.spark.internal.PslCredentialsProvider;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
@@ -47,7 +51,7 @@
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@AutoValue
-public abstract class PslDataSourceOptions implements Serializable {
+public abstract class PslReadDataSourceOptions implements Serializable {
private static final long serialVersionUID = 2680059304693561607L;
@Nullable
@@ -60,7 +64,7 @@ public abstract class PslDataSourceOptions implements Serializable {
public abstract long maxMessagesPerBatch();
public static Builder builder() {
- return new AutoValue_PslDataSourceOptions.Builder()
+ return new AutoValue_PslReadDataSourceOptions.Builder()
.setCredentialsKey(null)
.setMaxMessagesPerBatch(Constants.DEFAULT_MAX_MESSAGES_PER_BATCH)
.setFlowControlSettings(
@@ -70,7 +74,7 @@ public static Builder builder() {
.build());
}
- public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
+ public static PslReadDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options) {
if (!options.get(Constants.SUBSCRIPTION_CONFIG_KEY).isPresent()) {
throw new IllegalArgumentException(Constants.SUBSCRIPTION_CONFIG_KEY + " is required.");
}
@@ -115,7 +119,7 @@ public abstract static class Builder {
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
- public abstract PslDataSourceOptions build();
+ public abstract PslReadDataSourceOptions build();
}
MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
@@ -135,7 +139,7 @@ PartitionSubscriberFactory getSubscriberFactory() {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder()
- .setCredentialsProvider(new PslCredentialsProvider(this));
+ .setCredentialsProvider(new PslCredentialsProvider(credentialsKey()));
ServiceClients.addDefaultMetadata(
context, RoutingMetadata.of(this.subscriptionPath(), partition), settingsBuilder);
try {
@@ -161,7 +165,7 @@ private CursorServiceClient newCursorServiceClient() {
addDefaultSettings(
this.subscriptionPath().location().region(),
CursorServiceSettings.newBuilder()
- .setCredentialsProvider(new PslCredentialsProvider(this))));
+ .setCredentialsProvider(new PslCredentialsProvider(credentialsKey()))));
} catch (IOException e) {
throw new IllegalStateException("Unable to create CursorServiceClient.");
}
@@ -181,7 +185,7 @@ private AdminServiceClient newAdminServiceClient() {
addDefaultSettings(
this.subscriptionPath().location().region(),
AdminServiceSettings.newBuilder()
- .setCredentialsProvider(new PslCredentialsProvider(this))));
+ .setCredentialsProvider(new PslCredentialsProvider(credentialsKey()))));
} catch (IOException e) {
throw new IllegalStateException("Unable to create AdminServiceClient.");
}
@@ -201,7 +205,7 @@ private TopicStatsServiceClient newTopicStatsServiceClient() {
addDefaultSettings(
this.subscriptionPath().location().region(),
TopicStatsServiceSettings.newBuilder()
- .setCredentialsProvider(new PslCredentialsProvider(this))));
+ .setCredentialsProvider(new PslCredentialsProvider(credentialsKey()))));
} catch (IOException e) {
throw new IllegalStateException("Unable to create TopicStatsServiceClient.");
}
diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
index 1d54fe19..2510315a 100644
--- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
+++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java
@@ -19,12 +19,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import static scala.collection.JavaConverters.asScalaBufferConverter;
+import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.flogger.GoogleLogger;
import com.google.common.math.LongMath;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
@@ -34,15 +38,29 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
+import scala.Option;
+import scala.compat.java8.functionConverterImpls.FromJavaBiConsumer;
public class PslSparkUtils {
- private static ArrayBasedMapData convertAttributesToSparkMap(
+
+ private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
+
+ @VisibleForTesting
+ public static ArrayBasedMapData convertAttributesToSparkMap(
ListMultimap attributeMap) {
List keyList = new ArrayList<>();
@@ -83,6 +101,97 @@ public static InternalRow toInternalRow(
return InternalRow.apply(asScalaBufferConverter(list).asScala());
}
+ @SuppressWarnings("unchecked")
+ private static void extractVal(
+ StructType inputSchema,
+ InternalRow row,
+ String fieldName,
+ DataType expectedDataType,
+ Consumer consumer) {
+ Option